diff --git a/swh/storage/db.py b/swh/storage/db.py index 6c7d17d0..91935b46 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,1292 +1,1287 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import random import select from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE class Db(BaseDb): """Proxy to the SWH DB, with wrappers around stored procedures """ def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute( "SELECT swh_mktemp_dir_entry(%s)", (("directory_entry_%s" % entry_type),) ) @stored_procedure("swh_mktemp_revision") def mktemp_revision(self, cur=None): pass @stored_procedure("swh_mktemp_release") def mktemp_release(self, cur=None): pass @stored_procedure("swh_mktemp_snapshot_branch") def mktemp_snapshot_branch(self, cur=None): pass def register_listener(self, notify_queue, cur=None): """Register a listener for NOTIFY queue `notify_queue`""" self._cursor(cur).execute("LISTEN %s" % notify_queue) def listen_notifies(self, timeout): """Listen to notifications for `timeout` seconds""" if select.select([self.conn], [], [], timeout) == ([], [], []): return else: self.conn.poll() while self.conn.notifies: yield self.conn.notifies.pop(0) @stored_procedure("swh_content_add") def content_add_from_temp(self, cur=None): pass @stored_procedure("swh_directory_add") def directory_add_from_temp(self, cur=None): pass @stored_procedure("swh_skipped_content_add") def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure("swh_revision_add") def revision_add_from_temp(self, cur=None): pass @stored_procedure("swh_release_add") def release_add_from_temp(self, cur=None): pass def content_update_from_temp(self, keys_to_update, cur=None): cur = self._cursor(cur) cur.execute( """select swh_content_update(ARRAY[%s] :: text[])""" % keys_to_update ) content_get_metadata_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", ] content_add_keys = content_get_metadata_keys + ["ctime"] skipped_content_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "reason", "status", "origin", ] def content_get_metadata_from_sha1s(self, sha1s, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ select t.sha1, %s from (values %%s) as t (sha1) inner join content using (sha1) """ % ", ".join(self.content_get_metadata_keys[1:]), ((sha1,) for sha1 in sha1s), ) def content_get_range(self, start, end, limit=None, cur=None): """Retrieve contents within range [start, end]. """ cur = self._cursor(cur) query = """select %s from content where %%s <= sha1 and sha1 <= %%s order by sha1 limit %%s""" % ", ".join( self.content_get_metadata_keys ) cur.execute(query, (start, end, limit)) yield from cur content_hash_keys = ["sha1", "sha1_git", "sha256", "blake2s256"] def content_missing_from_list(self, contents, cur=None): cur = self._cursor(cur) keys = ", ".join(self.content_hash_keys) equality = " AND ".join( ("t.%s = c.%s" % (key, key)) for key in self.content_hash_keys ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(%s) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE %s ) """ % (keys, keys, equality), (tuple(c[key] for key in self.content_hash_keys) for c in contents), ) def content_missing_per_sha1(self, sha1s, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT t.sha1 FROM (VALUES %s) AS t(sha1) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE c.sha1 = t.sha1 )""", ((sha1,) for sha1 in sha1s), ) def content_missing_per_sha1_git(self, contents, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT t.sha1_git FROM (VALUES %s) AS t(sha1_git) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE c.sha1_git = t.sha1_git )""", ((sha1,) for sha1 in contents), ) def skipped_content_missing(self, contents, cur=None): if not contents: return [] cur = self._cursor(cur) query = """SELECT * FROM (VALUES %s) AS t (%s) WHERE not exists (SELECT 1 FROM skipped_content s WHERE s.sha1 is not distinct from t.sha1::sha1 and s.sha1_git is not distinct from t.sha1_git::sha1 and s.sha256 is not distinct from t.sha256::bytea);""" % ( (", ".join("%s" for _ in contents)), ", ".join(self.content_hash_keys), ) cur.execute( query, [tuple(cont[key] for key in self.content_hash_keys) for cont in contents], ) yield from cur def snapshot_exists(self, snapshot_id, cur=None): """Check whether a snapshot with the given id exists""" cur = self._cursor(cur) cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,)) return bool(cur.fetchone()) def snapshot_missing_from_list(self, snapshots, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM snapshot d WHERE d.id = t.id ) """, ((id,) for id in snapshots), ) def snapshot_add(self, snapshot_id, cur=None): """Add a snapshot from the temporary table""" cur = self._cursor(cur) cur.execute("""SELECT swh_snapshot_add(%s)""", (snapshot_id,)) snapshot_count_cols = ["target_type", "count"] def snapshot_count_branches(self, snapshot_id, cur=None): cur = self._cursor(cur) query = """\ SELECT %s FROM swh_snapshot_count_branches(%%s) """ % ", ".join( self.snapshot_count_cols ) cur.execute(query, (snapshot_id,)) yield from cur snapshot_get_cols = ["snapshot_id", "name", "target", "target_type"] def snapshot_get_by_id( self, snapshot_id, branches_from=b"", branches_count=None, target_types=None, cur=None, ): cur = self._cursor(cur) query = """\ SELECT %s FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[]) """ % ", ".join( self.snapshot_get_cols ) cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) yield from cur def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): cur = self._cursor(cur) query = """\ SELECT ovs.snapshot FROM origin_visit ov INNER JOIN origin o ON o.id = ov.origin INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit WHERE o.url=%s AND ov.visit=%s ORDER BY ovs.date DESC LIMIT 1 """ cur.execute(query, (origin_url, visit_id)) ret = cur.fetchone() if ret: return ret[0] def snapshot_get_random(self, cur=None): return self._get_random_row_from_table("snapshot", ["id"], "id", cur) content_find_cols = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", ] def content_find( self, sha1=None, sha1_git=None, sha256=None, blake2s256=None, cur=None ): """Find the content optionally on a combination of the following checksums sha1, sha1_git, sha256 or blake2s256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content blake2s256: blake2s256 content Returns: The tuple (sha1, sha1_git, sha256, blake2s256) if found or None. """ cur = self._cursor(cur) checksum_dict = { "sha1": sha1, "sha1_git": sha1_git, "sha256": sha256, "blake2s256": blake2s256, } where_parts = [] args = [] # Adds only those keys which have value other than None for algorithm in checksum_dict: if checksum_dict[algorithm] is not None: args.append(checksum_dict[algorithm]) where_parts.append(algorithm + "= %s") query = " AND ".join(where_parts) cur.execute( """SELECT %s FROM content WHERE %s """ % (",".join(self.content_find_cols), query), args, ) content = cur.fetchall() return content def content_get_random(self, cur=None): return self._get_random_row_from_table("content", ["sha1_git"], "sha1_git", cur) def directory_missing_from_list(self, directories, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM directory d WHERE d.id = t.id ) """, ((id,) for id in directories), ) directory_ls_cols = [ "dir_id", "type", "target", "name", "perms", "status", "sha1", "sha1_git", "sha256", "length", ] def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_directory_walk_one(%%s)" % cols cur.execute(query, (directory,)) yield from cur def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_directory_walk(%%s)" % cols cur.execute(query, (directory,)) yield from cur def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)" % cols cur.execute(query, (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return data def directory_get_random(self, cur=None): return self._get_random_row_from_table("directory", ["id"], "id", cur) def revision_missing_from_list(self, revisions, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM revision r WHERE r.id = t.id ) """, ((id,) for id in revisions), ) revision_add_cols = [ "id", "date", "date_offset", "date_neg_utc_offset", "committer_date", "committer_date_offset", "committer_date_neg_utc_offset", "type", "directory", "message", "author_fullname", "author_name", "author_email", "committer_fullname", "committer_name", "committer_email", "metadata", "synthetic", ] revision_get_cols = revision_add_cols + ["parents"] def origin_visit_add(self, origin, ts, type, cur=None): """Add a new origin_visit for origin origin at timestamp ts. Args: origin: origin concerned by the visit ts: the date of the visit type: type of loader for the visit Returns: The new visit index step for that origin """ cur = self._cursor(cur) self._cursor(cur).execute( "SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) ) return cur.fetchone()[0] origin_visit_status_cols = [ "origin", "visit", "date", "status", "snapshot", "metadata", ] def origin_visit_status_add( self, visit_status: OriginVisitStatus, cur=None ) -> None: """Add new origin visit status """ assert self.origin_visit_status_cols[0] == "origin" assert self.origin_visit_status_cols[-1] == "metadata" cols = self.origin_visit_status_cols[1:-1] cur = self._cursor(cur) cur.execute( f"WITH origin_id as (select id from origin where url=%s) " f"INSERT INTO origin_visit_status " f"(origin, {', '.join(cols)}, metadata) " f"VALUES ((select id from origin_id), " f"{', '.join(['%s']*len(cols))}, %s) " f"ON CONFLICT (origin, visit, date) do nothing", [visit_status.origin] + [getattr(visit_status, key) for key in cols] + [jsonize(visit_status.metadata)], ) def origin_visit_add_with_id(self, origin_visit: OriginVisit, cur=None) -> None: """Insert origin visit when id are already set """ ov = origin_visit assert ov.visit is not None - # doing an extra query like this is way simpler than trying to join - # the origin id in the query below - origin_id = next(self.origin_id_get_by_url([ov.origin])) - origin_visit_cols = ["origin", "visit", "date", "type"] - cur = self._cursor(cur) - query = """INSERT INTO origin_visit ({cols}) VALUES ({values}) + origin_visit_cols = ["origin", "visit", "date", "type"] + query = """INSERT INTO origin_visit ({cols}) + VALUES ((select id from origin where url=%s), {values}) ON CONFLICT (origin, visit) DO NOTHING""".format( cols=", ".join(origin_visit_cols), - values=", ".join("%s" for col in origin_visit_cols), - ) - cur.execute( - query, (origin_id, ov.visit, ov.date, ov.type), + values=", ".join("%s" for col in origin_visit_cols[1:]), ) + cur.execute(query, (ov.origin, ov.visit, ov.date, ov.type)) origin_visit_get_cols = [ "origin", "visit", "date", "type", "status", "metadata", "snapshot", ] origin_visit_select_cols = [ "o.url AS origin", "ov.visit", "ov.date", "ov.type AS type", "ovs.status", "ovs.metadata", "ovs.snapshot", ] origin_visit_status_select_cols = [ "o.url AS origin", "ovs.visit", "ovs.date", "ovs.status", "ovs.snapshot", "ovs.metadata", ] def _make_origin_visit_status( self, row: Optional[Tuple[Any]] ) -> Optional[Dict[str, Any]]: """Make an origin_visit_status dict out of a row """ if not row: return None return dict(zip(self.origin_visit_status_cols, row)) def origin_visit_status_get_latest( self, origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, cur=None, ) -> Optional[Dict[str, Any]]: """Given an origin visit id, return its latest origin_visit_status """ cur = self._cursor(cur) query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_status_select_cols), "FROM origin_visit_status ovs ", "INNER JOIN origin o ON o.id = ovs.origin", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_url] query_parts.append("AND ovs.visit = %s") query_params.append(visit) if require_snapshot: query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append("AND ovs.status IN %s") query_params.append(tuple(allowed_statuses)) query_parts.append("ORDER BY ovs.date DESC LIMIT 1") query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) row = cur.fetchone() return self._make_origin_visit_status(row) def origin_visit_get_all( self, origin_id, last_visit=None, order="asc", limit=None, cur=None ): """Retrieve all visits for origin with id origin_id. Args: origin_id: The occurrence's origin Yields: The visits for that origin """ cur = self._cursor(cur) assert order.lower() in ["asc", "desc"] query_parts = [ "SELECT DISTINCT ON (ov.visit) %s " % ", ".join(self.origin_visit_select_cols), "FROM origin_visit ov", "INNER JOIN origin o ON o.id = ov.origin", "INNER JOIN origin_visit_status ovs", "ON ov.origin = ovs.origin AND ov.visit = ovs.visit", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_id] if last_visit is not None: op_comparison = ">" if order == "asc" else "<" query_parts.append(f"and ov.visit {op_comparison} %s") query_params.append(last_visit) if order == "asc": query_parts.append("ORDER BY ov.visit ASC, ovs.date DESC") elif order == "desc": query_parts.append("ORDER BY ov.visit DESC, ovs.date DESC") else: assert False if limit is not None: query_parts.append("LIMIT %s") query_params.append(limit) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) yield from cur def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. Args: origin_id: the origin concerned visit_id: The visit step for that origin Returns: The origin_visit information """ cur = self._cursor(cur) query = """\ SELECT %s FROM origin_visit ov INNER JOIN origin o ON o.id = ov.origin INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit WHERE o.url = %%s AND ov.visit = %%s ORDER BY ovs.date DESC LIMIT 1 """ % ( ", ".join(self.origin_visit_select_cols) ) cur.execute(query, (origin_id, visit_id)) r = cur.fetchall() if not r: return None return r[0] def origin_visit_find_by_date(self, origin, visit_date, cur=None): cur = self._cursor(cur) cur.execute( "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) ) rows = cur.fetchall() if rows: visit = dict(zip(self.origin_visit_get_cols, rows[0])) visit["origin"] = origin return visit def origin_visit_exists(self, origin_id, visit_id, cur=None): """Check whether an origin visit with the given ids exists""" cur = self._cursor(cur) query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" cur.execute(query, (origin_id, visit_id)) return bool(cur.fetchone()) def origin_visit_get_latest( self, origin_id: str, type: Optional[str], allowed_statuses: Optional[Iterable[str]], require_snapshot: bool, cur=None, ): """Retrieve the most recent origin_visit of the given origin, with optional filters. Args: origin_id: the origin concerned type: Optional visit type to filter on allowed_statuses: the visit statuses allowed for the returned visit require_snapshot (bool): If True, only a visit with a known snapshot will be returned. Returns: The origin_visit information, or None if no visit matches. """ cur = self._cursor(cur) query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_select_cols), "FROM origin_visit ov ", "INNER JOIN origin o ON o.id = ov.origin", "INNER JOIN origin_visit_status ovs ", "ON o.id = ovs.origin AND ov.visit = ovs.visit ", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_id] if type is not None: query_parts.append("AND ov.type = %s") query_params.append(type) if require_snapshot: query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append("AND ovs.status IN %s") query_params.append(tuple(allowed_statuses)) query_parts.append( "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" ) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) r = cur.fetchone() if not r: return None return r def origin_visit_get_random(self, type, cur=None): """Randomly select one origin visit that was full and in the last 3 months """ cur = self._cursor(cur) columns = ",".join(self.origin_visit_select_cols) query = f"""select {columns} from origin_visit ov inner join origin o on ov.origin=o.id inner join origin_visit_status ovs on ov.origin = ovs.origin and ov.visit = ovs.visit where ovs.status='full' and ov.type=%s and ov.date > now() - '3 months'::interval and random() < 0.1 limit 1 """ cur.execute(query, (type,)) return cur.fetchone() @staticmethod def mangle_query_key(key, main_table): if key == "id": return "t.id" if key == "parents": return """ ARRAY( SELECT rh.parent_id::bytea FROM revision_history rh WHERE rh.id = t.id ORDER BY rh.parent_rank )""" if "_" not in key: return "%s.%s" % (main_table, key) head, tail = key.split("_", 1) if head in ("author", "committer") and tail in ( "name", "email", "id", "fullname", ): return "%s.%s" % (head, tail) return "%s.%s" % (main_table, key) def revision_get_from_list(self, revisions, cur=None): cur = self._cursor(cur) query_keys = ", ".join( self.mangle_query_key(k, "revision") for k in self.revision_get_cols ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(sortkey, id) LEFT JOIN revision ON t.id = revision.id LEFT JOIN person author ON revision.author = author.id LEFT JOIN person committer ON revision.committer = committer.id ORDER BY sortkey """ % query_keys, ((sortkey, id) for sortkey, id in enumerate(revisions)), ) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ", ".join( self.revision_get_cols ) cur.execute(query, (root_revisions, limit)) yield from cur revision_shortlog_cols = ["id", "parents"] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ", ".join( self.revision_shortlog_cols ) cur.execute(query, (root_revisions, limit)) yield from cur def revision_get_random(self, cur=None): return self._get_random_row_from_table("revision", ["id"], "id", cur) def release_missing_from_list(self, releases, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM release r WHERE r.id = t.id ) """, ((id,) for id in releases), ) object_find_by_sha1_git_cols = ["sha1_git", "type"] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ WITH t (sha1_git) AS (VALUES %s), known_objects as (( select id as sha1_git, 'release'::object_type as type, object_id from release r where exists (select 1 from t where t.sha1_git = r.id) ) union all ( select id as sha1_git, 'revision'::object_type as type, object_id from revision r where exists (select 1 from t where t.sha1_git = r.id) ) union all ( select id as sha1_git, 'directory'::object_type as type, object_id from directory d where exists (select 1 from t where t.sha1_git = d.id) ) union all ( select sha1_git as sha1_git, 'content'::object_type as type, object_id from content c where exists (select 1 from t where t.sha1_git = c.sha1_git) )) select t.sha1_git as sha1_git, k.type from t left join known_objects k on t.sha1_git = k.sha1_git """, ((id,) for id in ids), ) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute("SELECT * FROM swh_stat_counters()") yield from cur def origin_add(self, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (url) values (%s) RETURNING url""" cur.execute(insert, (url,)) return cur.fetchone()[0] origin_cols = ["url"] def origin_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) query = """SELECT %s FROM (VALUES %%s) as t(url) LEFT JOIN origin ON t.url = origin.url """ % ",".join( "origin." + col for col in self.origin_cols ) yield from execute_values_generator(cur, query, ((url,) for url in origins)) def origin_get_by_sha1(self, sha1s, cur=None): """Retrieve origin urls from sha1s if found.""" cur = self._cursor(cur) query = """SELECT %s FROM (VALUES %%s) as t(sha1) LEFT JOIN origin ON t.sha1 = digest(origin.url, 'sha1') """ % ",".join( "origin." + col for col in self.origin_cols ) yield from execute_values_generator(cur, query, ((sha1,) for sha1 in sha1s)) def origin_id_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) query = """SELECT id FROM (VALUES %s) as t(url) LEFT JOIN origin ON t.url = origin.url """ for row in execute_values_generator(cur, query, ((url,) for url in origins)): yield row[0] origin_get_range_cols = ["id", "url"] def origin_get_range(self, origin_from=1, origin_count=100, cur=None): """Retrieve ``origin_count`` origins whose ids are greater or equal than ``origin_from``. Origins are sorted by id before retrieving them. Args: origin_from (int): the minimum id of origins to retrieve origin_count (int): the maximum number of origins to retrieve """ cur = self._cursor(cur) query = """SELECT %s FROM origin WHERE id >= %%s ORDER BY id LIMIT %%s """ % ",".join( self.origin_get_range_cols ) cur.execute(query, (origin_from, origin_count)) yield from cur def _origin_query( self, url_pattern, count=False, offset=0, limit=50, regexp=False, with_visit=False, cur=None, ): """ Method factorizing query creation for searching and counting origins. """ cur = self._cursor(cur) if count: origin_cols = "COUNT(*)" else: origin_cols = ",".join(self.origin_cols) query = """SELECT %s FROM origin o WHERE """ if with_visit: query += """ EXISTS ( SELECT 1 FROM origin_visit ov INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit INNER JOIN snapshot ON ovs.snapshot=snapshot.id WHERE ov.origin=o.id ) AND """ query += "url %s %%s " if not count: query += "ORDER BY id OFFSET %%s LIMIT %%s" if not regexp: query = query % (origin_cols, "ILIKE") query_params = ("%" + url_pattern + "%", offset, limit) else: query = query % (origin_cols, "~*") query_params = (url_pattern, offset, limit) if count: query_params = (query_params[0],) cur.execute(query, query_params) def origin_search( self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, cur=None ): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls offset (int): number of found origins to skip before returning results limit (int): the maximum number of found origins to return regexp (bool): if True, consider the provided pattern as a regular expression and returns origins whose urls match it with_visit (bool): if True, filter out origins with no visit """ self._origin_query( url_pattern, offset=offset, limit=limit, regexp=regexp, with_visit=with_visit, cur=cur, ) yield from cur def origin_count(self, url_pattern, regexp=False, with_visit=False, cur=None): """Count origins whose urls contain a provided string pattern or match a provided regular expression. The pattern search in origin urls is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls regexp (bool): if True, consider the provided pattern as a regular expression and returns origins whose urls match it with_visit (bool): if True, filter out origins with no visit """ self._origin_query( url_pattern, count=True, regexp=regexp, with_visit=with_visit, cur=cur ) return cur.fetchone()[0] release_add_cols = [ "id", "target", "target_type", "date", "date_offset", "date_neg_utc_offset", "name", "comment", "synthetic", "author_fullname", "author_name", "author_email", ] release_get_cols = release_add_cols def release_get_from_list(self, releases, cur=None): cur = self._cursor(cur) query_keys = ", ".join( self.mangle_query_key(k, "release") for k in self.release_get_cols ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(sortkey, id) LEFT JOIN release ON t.id = release.id LEFT JOIN person author ON release.author = author.id ORDER BY sortkey """ % query_keys, ((sortkey, id) for sortkey, id in enumerate(releases)), ) def release_get_random(self, cur=None): return self._get_random_row_from_table("release", ["id"], "id", cur) _object_metadata_context_cols = [ "origin", "visit", "snapshot", "release", "revision", "path", "directory", ] """The list of context columns for all artifact types.""" _object_metadata_insert_cols = [ "type", "id", "authority_id", "fetcher_id", "discovery_date", "format", "metadata", *_object_metadata_context_cols, ] """List of columns of the object_metadata table, used when writing metadata.""" _object_metadata_insert_query = f""" INSERT INTO object_metadata ({', '.join(_object_metadata_insert_cols)}) VALUES ({', '.join('%s' for _ in _object_metadata_insert_cols)}) ON CONFLICT (id, authority_id, discovery_date, fetcher_id) DO NOTHING """ object_metadata_get_cols = [ "id", "discovery_date", "metadata_authority.type", "metadata_authority.url", "metadata_fetcher.id", "metadata_fetcher.name", "metadata_fetcher.version", *_object_metadata_context_cols, "format", "metadata", ] """List of columns of the object_metadata, metadata_authority, and metadata_fetcher tables, used when reading object metadata.""" _object_metadata_select_query = f""" SELECT object_metadata.id AS id, {', '.join(object_metadata_get_cols[1:-1])}, object_metadata.metadata AS metadata FROM object_metadata INNER JOIN metadata_authority ON (metadata_authority.id=authority_id) INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) WHERE object_metadata.id=%s AND authority_id=%s """ def object_metadata_add( self, object_type: str, id: str, context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, authority_id: int, fetcher_id: int, format: str, metadata: bytes, cur, ): query = self._object_metadata_insert_query args: Dict[str, Any] = dict( type=object_type, id=id, authority_id=authority_id, fetcher_id=fetcher_id, discovery_date=discovery_date, format=format, metadata=metadata, ) for col in self._object_metadata_context_cols: args[col] = context.get(col) params = [args[col] for col in self._object_metadata_insert_cols] cur.execute(query, params) def object_metadata_get( self, object_type: str, id: str, authority_id: int, after_time: Optional[datetime.datetime], after_fetcher: Optional[int], limit: int, cur, ): query_parts = [self._object_metadata_select_query] args = [id, authority_id] if after_fetcher is not None: assert after_time query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)") args.extend([after_time, after_fetcher]) elif after_time is not None: query_parts.append("AND discovery_date > %s") args.append(after_time) query_parts.append("ORDER BY discovery_date, fetcher_id") if limit: query_parts.append("LIMIT %s") args.append(limit) cur.execute(" ".join(query_parts), args) yield from cur metadata_fetcher_cols = ["name", "version", "metadata"] def metadata_fetcher_add( self, name: str, version: str, metadata: bytes, cur=None ) -> None: cur = self._cursor(cur) cur.execute( "INSERT INTO metadata_fetcher (name, version, metadata) " "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (name, version, jsonize(metadata)), ) def metadata_fetcher_get(self, name: str, version: str, cur=None): cur = self._cursor(cur) cur.execute( f"SELECT {', '.join(self.metadata_fetcher_cols)} " f"FROM metadata_fetcher " f"WHERE name=%s AND version=%s", (name, version), ) return cur.fetchone() def metadata_fetcher_get_id( self, name: str, version: str, cur=None ) -> Optional[int]: cur = self._cursor(cur) cur.execute( "SELECT id FROM metadata_fetcher WHERE name=%s AND version=%s", (name, version), ) row = cur.fetchone() if row: return row[0] else: return None metadata_authority_cols = ["type", "url", "metadata"] def metadata_authority_add( self, type: str, url: str, metadata: bytes, cur=None ) -> None: cur = self._cursor(cur) cur.execute( "INSERT INTO metadata_authority (type, url, metadata) " "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (type, url, jsonize(metadata)), ) def metadata_authority_get(self, type: str, url: str, cur=None): cur = self._cursor(cur) cur.execute( f"SELECT {', '.join(self.metadata_authority_cols)} " f"FROM metadata_authority " f"WHERE type=%s AND url=%s", (type, url), ) return cur.fetchone() def metadata_authority_get_id(self, type: str, url: str, cur=None) -> Optional[int]: cur = self._cursor(cur) cur.execute( "SELECT id FROM metadata_authority WHERE type=%s AND url=%s", (type, url) ) row = cur.fetchone() if row: return row[0] else: return None def _get_random_row_from_table(self, table_name, cols, id_col, cur=None): random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE)) cur = self._cursor(cur) query = """ (SELECT {cols} FROM {table} WHERE {id_col} >= %s ORDER BY {id_col} LIMIT 1) UNION (SELECT {cols} FROM {table} WHERE {id_col} < %s ORDER BY {id_col} DESC LIMIT 1) LIMIT 1 """.format( cols=", ".join(cols), table=table_name, id_col=id_col ) cur.execute(query, (random_sha1, random_sha1)) row = cur.fetchone() if row: return row[0] diff --git a/swh/storage/storage.py b/swh/storage/storage.py index 8781fa32..56360a1d 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1,1419 +1,1419 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import itertools from collections import defaultdict from contextlib import contextmanager from deprecated import deprecated from typing import Any, Dict, Iterable, List, Optional, Union import attr import psycopg2 import psycopg2.pool import psycopg2.errors from swh.core.api.serializers import msgpack_loads, msgpack_dumps from swh.model.model import ( Content, Directory, Origin, OriginVisit, OriginVisitStatus, Revision, Release, SkippedContent, Snapshot, SHA1_SIZE, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.storage.objstorage import ObjStorage from swh.storage.validate import VALIDATION_EXCEPTIONS from swh.storage.utils import now from . import converters from .extrinsic_metadata import ( check_extrinsic_metadata_context, CONTEXT_KEYS, ) from .common import db_transaction_generator, db_transaction from .db import Db from .exc import StorageArgumentException, StorageDBError, HashCollision from .algos import diff from .metrics import timed, send_metric, process_metrics from .utils import get_partition_bounds_bytes, extract_collision_hash from .writer import JournalWriter # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") """Identifier for the empty snapshot""" VALIDATION_EXCEPTIONS = VALIDATION_EXCEPTIONS + [ psycopg2.errors.CheckViolation, psycopg2.errors.IntegrityError, psycopg2.errors.InvalidTextRepresentation, psycopg2.errors.NotNullViolation, psycopg2.errors.NumericValueOutOfRange, psycopg2.errors.UndefinedFunction, # (raised on wrong argument typs) ] """Exceptions raised by postgresql when validation of the arguments failed.""" @contextlib.contextmanager def convert_validation_exceptions(): """Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.""" try: yield except tuple(VALIDATION_EXCEPTIONS) as e: raise StorageArgumentException(str(e)) class Storage: """SWH storage proxy, encompassing DB and object storage """ def __init__( self, db, objstorage, min_pool_conns=1, max_pool_conns=10, journal_writer=None ): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection obj_root: path to the root of the object storage """ try: if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = Db(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db ) self._db = None except psycopg2.OperationalError as e: raise StorageDBError(e) self.journal_writer = JournalWriter(journal_writer) self.objstorage = ObjStorage(objstorage) def get_db(self): if self._db: return self._db else: return Db.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @contextmanager def db(self): db = None try: db = self.get_db() yield db finally: if db: self.put_db(db) @timed @db_transaction() def check_config(self, *, check_write, db=None, cur=None): if not self.objstorage.check_config(check_write=check_write): return False # Check permissions on one of the tables if check_write: check = "INSERT" else: check = "SELECT" cur.execute("select has_table_privilege(current_user, 'content', %s)", (check,)) return cur.fetchone()[0] def _content_unique_key(self, hash, db): """Given a hash (tuple or dict), return a unique key from the aggregation of keys. """ keys = db.content_hash_keys if isinstance(hash, tuple): return hash return tuple([hash[k] for k in keys]) def _content_add_metadata(self, db, cur, content): """Add content to the postgresql database but not the object storage. """ # create temporary table for metadata injection db.mktemp("content", cur) db.copy_to( (c.to_dict() for c in content), "tmp_content", db.content_add_keys, cur ) # move metadata in place try: db.content_add_from_temp(cur) except psycopg2.IntegrityError as e: if e.diag.sqlstate == "23505" and e.diag.table_name == "content": message_detail = e.diag.message_detail if message_detail: hash_name, hash_id = extract_collision_hash(message_detail) collision_contents_hashes = [ c.hashes() for c in content if c.get_hash(hash_name) == hash_id ] else: constraint_to_hash_name = { "content_pkey": "sha1", "content_sha1_git_idx": "sha1_git", "content_sha256_idx": "sha256", } hash_name = constraint_to_hash_name.get(e.diag.constraint_name) hash_id = None collision_contents_hashes = None raise HashCollision( hash_name, hash_id, collision_contents_hashes ) from None else: raise @timed @process_metrics def content_add(self, content: Iterable[Content]) -> Dict: ctime = now() contents = [attr.evolve(c, ctime=ctime) for c in content] objstorage_summary = self.objstorage.content_add(contents) with self.db() as db: with db.transaction() as cur: missing = list( self.content_missing( map(Content.to_dict, contents), key_hash="sha1_git", db=db, cur=cur, ) ) contents = [c for c in contents if c.sha1_git in missing] self.journal_writer.content_add(contents) self._content_add_metadata(db, cur, contents) return { "content:add": len(contents), "content:add:bytes": objstorage_summary["content:add:bytes"], } @timed @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): # TODO: Add a check on input keys. How to properly implement # this? We don't know yet the new columns. self.journal_writer.content_update(content) db.mktemp("content", cur) select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) with convert_validation_exceptions(): db.copy_to(content, "tmp_content", select_keys, cur) db.content_update_from_temp(keys_to_update=keys, cur=cur) @timed @process_metrics @db_transaction() def content_add_metadata( self, content: Iterable[Content], db=None, cur=None ) -> Dict: contents = list(content) missing = self.content_missing( (c.to_dict() for c in contents), key_hash="sha1_git", db=db, cur=cur, ) contents = [c for c in contents if c.sha1_git in missing] self.journal_writer.content_add_metadata(contents) self._content_add_metadata(db, cur, contents) return { "content:add": len(contents), } @timed def content_get(self, content): # FIXME: Make this method support slicing the `data`. if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX ) yield from self.objstorage.content_get(content) @timed @db_transaction() def content_get_range(self, start, end, limit=1000, db=None, cur=None): if limit is None: raise StorageArgumentException("limit should not be None") contents = [] next_content = None for counter, content_row in enumerate( db.content_get_range(start, end, limit + 1, cur) ): content = dict(zip(db.content_get_metadata_keys, content_row)) if counter >= limit: # take the last commit for the next page starting from this next_content = content["sha1"] break contents.append(content) return { "contents": contents, "next": next_content, } @timed def content_get_partition( self, partition_id: int, nb_partitions: int, limit: int = 1000, page_token: str = None, ): if limit is None: raise StorageArgumentException("limit should not be None") (start, end) = get_partition_bounds_bytes( partition_id, nb_partitions, SHA1_SIZE ) if page_token: start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE result = self.content_get_range(start, end, limit) result2 = { "contents": result["contents"], "next_page_token": None, } if result["next"]: result2["next_page_token"] = hash_to_hex(result["next"]) return result2 @timed @db_transaction(statement_timeout=500) def content_get_metadata( self, contents: List[bytes], db=None, cur=None ) -> Dict[bytes, List[Dict]]: result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents} for row in db.content_get_metadata_from_sha1s(contents, cur): content_meta = dict(zip(db.content_get_metadata_keys, row)) result[content_meta["sha1"]].append(content_meta) return result @timed @db_transaction_generator() def content_missing(self, content, key_hash="sha1", db=None, cur=None): keys = db.content_hash_keys if key_hash not in keys: raise StorageArgumentException("key_hash should be one of %s" % keys) key_hash_idx = keys.index(key_hash) if not content: return for obj in db.content_missing_from_list(content, cur): yield obj[key_hash_idx] @timed @db_transaction_generator() def content_missing_per_sha1(self, contents, db=None, cur=None): for obj in db.content_missing_per_sha1(contents, cur): yield obj[0] @timed @db_transaction_generator() def content_missing_per_sha1_git(self, contents, db=None, cur=None): for obj in db.content_missing_per_sha1_git(contents, cur): yield obj[0] @timed @db_transaction() def content_find(self, content, db=None, cur=None): if not set(content).intersection(DEFAULT_ALGORITHMS): raise StorageArgumentException( "content keys must contain at least one of: " "sha1, sha1_git, sha256, blake2s256" ) contents = db.content_find( sha1=content.get("sha1"), sha1_git=content.get("sha1_git"), sha256=content.get("sha256"), blake2s256=content.get("blake2s256"), cur=cur, ) return [dict(zip(db.content_find_cols, content)) for content in contents] @timed @db_transaction() def content_get_random(self, db=None, cur=None): return db.content_get_random(cur) @staticmethod def _skipped_content_normalize(d): d = d.copy() if d.get("status") is None: d["status"] = "absent" if d.get("length") is None: d["length"] = -1 return d @staticmethod def _skipped_content_validate(d): """Sanity checks on status / reason / length, that postgresql doesn't enforce.""" if d["status"] != "absent": raise StorageArgumentException( "Invalid content status: {}".format(d["status"]) ) if d.get("reason") is None: raise StorageArgumentException( "Must provide a reason if content is absent." ) if d["length"] < -1: raise StorageArgumentException("Content length must be positive or -1.") def _skipped_content_add_metadata(self, db, cur, content: Iterable[SkippedContent]): origin_ids = db.origin_id_get_by_url([cont.origin for cont in content], cur=cur) content = [ attr.evolve(c, origin=origin_id) for (c, origin_id) in zip(content, origin_ids) ] db.mktemp("skipped_content", cur) db.copy_to( [c.to_dict() for c in content], "tmp_skipped_content", db.skipped_content_keys, cur, ) # move metadata in place db.skipped_content_add_from_temp(cur) @timed @process_metrics @db_transaction() def skipped_content_add( self, content: Iterable[SkippedContent], db=None, cur=None ) -> Dict: ctime = now() content = [attr.evolve(c, ctime=ctime) for c in content] missing_contents = self.skipped_content_missing( (c.to_dict() for c in content), db=db, cur=cur, ) content = [ c for c in content if any( all( c.get_hash(algo) == missing_content.get(algo) for algo in DEFAULT_ALGORITHMS ) for missing_content in missing_contents ) ] self.journal_writer.skipped_content_add(content) self._skipped_content_add_metadata(db, cur, content) return { "skipped_content:add": len(content), } @timed @db_transaction_generator() def skipped_content_missing(self, contents, db=None, cur=None): contents = list(contents) for content in db.skipped_content_missing(contents, cur): yield dict(zip(db.content_hash_keys, content)) @timed @process_metrics @db_transaction() def directory_add( self, directories: Iterable[Directory], db=None, cur=None ) -> Dict: directories = list(directories) summary = {"directory:add": 0} dirs = set() dir_entries: Dict[str, defaultdict] = { "file": defaultdict(list), "dir": defaultdict(list), "rev": defaultdict(list), } for cur_dir in directories: dir_id = cur_dir.id dirs.add(dir_id) for src_entry in cur_dir.entries: entry = src_entry.to_dict() entry["dir_id"] = dir_id dir_entries[entry["type"]][dir_id].append(entry) dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) if not dirs_missing: return summary self.journal_writer.directory_add( dir_ for dir_ in directories if dir_.id in dirs_missing ) # Copy directory ids dirs_missing_dict = ({"id": dir} for dir in dirs_missing) db.mktemp("directory", cur) db.copy_to(dirs_missing_dict, "tmp_directory", ["id"], cur) # Copy entries for entry_type, entry_list in dir_entries.items(): entries = itertools.chain.from_iterable( entries_for_dir for dir_id, entries_for_dir in entry_list.items() if dir_id in dirs_missing ) db.mktemp_dir_entry(entry_type) db.copy_to( entries, "tmp_directory_entry_%s" % entry_type, ["target", "name", "perms", "dir_id"], cur, ) # Do the final copy db.directory_add_from_temp(cur) summary["directory:add"] = len(dirs_missing) return summary @timed @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): for obj in db.directory_missing_from_list(directories, cur): yield obj[0] @timed @db_transaction_generator(statement_timeout=20000) def directory_ls(self, directory, recursive=False, db=None, cur=None): if recursive: res_gen = db.directory_walk(directory, cur=cur) else: res_gen = db.directory_walk_one(directory, cur=cur) for line in res_gen: yield dict(zip(db.directory_ls_cols, line)) @timed @db_transaction(statement_timeout=2000) def directory_entry_get_by_path(self, directory, paths, db=None, cur=None): res = db.directory_entry_get_by_path(directory, paths, cur) if res: return dict(zip(db.directory_ls_cols, res)) @timed @db_transaction() def directory_get_random(self, db=None, cur=None): return db.directory_get_random(cur) @timed @process_metrics @db_transaction() def revision_add(self, revisions: Iterable[Revision], db=None, cur=None) -> Dict: revisions = list(revisions) summary = {"revision:add": 0} revisions_missing = set( self.revision_missing( set(revision.id for revision in revisions), db=db, cur=cur ) ) if not revisions_missing: return summary db.mktemp_revision(cur) revisions_filtered = [ revision for revision in revisions if revision.id in revisions_missing ] self.journal_writer.revision_add(revisions_filtered) revisions_filtered = list(map(converters.revision_to_db, revisions_filtered)) parents_filtered: List[bytes] = [] with convert_validation_exceptions(): db.copy_to( revisions_filtered, "tmp_revision", db.revision_add_cols, cur, lambda rev: parents_filtered.extend(rev["parents"]), ) db.revision_add_from_temp(cur) db.copy_to( parents_filtered, "revision_history", ["id", "parent_id", "parent_rank"], cur, ) return {"revision:add": len(revisions_missing)} @timed @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): if not revisions: return for obj in db.revision_missing_from_list(revisions, cur): yield obj[0] @timed @db_transaction_generator(statement_timeout=1000) def revision_get(self, revisions, db=None, cur=None): for line in db.revision_get_from_list(revisions, cur): data = converters.db_to_revision(dict(zip(db.revision_get_cols, line))) if not data["type"]: yield None continue yield data @timed @db_transaction_generator(statement_timeout=2000) def revision_log(self, revisions, limit=None, db=None, cur=None): for line in db.revision_log(revisions, limit, cur): data = converters.db_to_revision(dict(zip(db.revision_get_cols, line))) if not data["type"]: yield None continue yield data @timed @db_transaction_generator(statement_timeout=2000) def revision_shortlog(self, revisions, limit=None, db=None, cur=None): yield from db.revision_shortlog(revisions, limit, cur) @timed @db_transaction() def revision_get_random(self, db=None, cur=None): return db.revision_get_random(cur) @timed @process_metrics @db_transaction() def release_add(self, releases: Iterable[Release], db=None, cur=None) -> Dict: releases = list(releases) summary = {"release:add": 0} release_ids = set(release.id for release in releases) releases_missing = set(self.release_missing(release_ids, db=db, cur=cur)) if not releases_missing: return summary db.mktemp_release(cur) releases_filtered = [ release for release in releases if release.id in releases_missing ] self.journal_writer.release_add(releases_filtered) releases_filtered = list(map(converters.release_to_db, releases_filtered)) with convert_validation_exceptions(): db.copy_to(releases_filtered, "tmp_release", db.release_add_cols, cur) db.release_add_from_temp(cur) return {"release:add": len(releases_missing)} @timed @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): if not releases: return for obj in db.release_missing_from_list(releases, cur): yield obj[0] @timed @db_transaction_generator(statement_timeout=500) def release_get(self, releases, db=None, cur=None): for release in db.release_get_from_list(releases, cur): data = converters.db_to_release(dict(zip(db.release_get_cols, release))) yield data if data["target_type"] else None @timed @db_transaction() def release_get_random(self, db=None, cur=None): return db.release_get_random(cur) @timed @process_metrics @db_transaction() def snapshot_add(self, snapshots: Iterable[Snapshot], db=None, cur=None) -> Dict: created_temp_table = False count = 0 for snapshot in snapshots: if not db.snapshot_exists(snapshot.id, cur): if not created_temp_table: db.mktemp_snapshot_branch(cur) created_temp_table = True with convert_validation_exceptions(): db.copy_to( ( { "name": name, "target": info.target if info else None, "target_type": ( info.target_type.value if info else None ), } for name, info in snapshot.branches.items() ), "tmp_snapshot_branch", ["name", "target", "target_type"], cur, ) self.journal_writer.snapshot_add([snapshot]) db.snapshot_add(snapshot.id, cur) count += 1 return {"snapshot:add": count} @timed @db_transaction_generator() def snapshot_missing(self, snapshots, db=None, cur=None): for obj in db.snapshot_missing_from_list(snapshots, cur): yield obj[0] @timed @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): return self.snapshot_get_branches(snapshot_id, db=db, cur=cur) @timed @db_transaction(statement_timeout=2000) def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur) if snapshot_id: return self.snapshot_get(snapshot_id, db=db, cur=cur) return None @timed @db_transaction(statement_timeout=2000) def snapshot_count_branches(self, snapshot_id, db=None, cur=None): return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)]) @timed @db_transaction(statement_timeout=2000) def snapshot_get_branches( self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None, db=None, cur=None, ): if snapshot_id == EMPTY_SNAPSHOT_ID: return { "id": snapshot_id, "branches": {}, "next_branch": None, } branches = {} next_branch = None fetched_branches = list( db.snapshot_get_by_id( snapshot_id, branches_from=branches_from, branches_count=branches_count + 1, target_types=target_types, cur=cur, ) ) for branch in fetched_branches[:branches_count]: branch = dict(zip(db.snapshot_get_cols, branch)) del branch["snapshot_id"] name = branch.pop("name") if branch == {"target": None, "target_type": None}: branch = None branches[name] = branch if len(fetched_branches) > branches_count: branch = dict(zip(db.snapshot_get_cols, fetched_branches[-1])) next_branch = branch["name"] if branches: return { "id": snapshot_id, "branches": branches, "next_branch": next_branch, } return None @timed @db_transaction() def snapshot_get_random(self, db=None, cur=None): return db.snapshot_get_random(cur) @timed @db_transaction() def origin_visit_add( self, visits: Iterable[OriginVisit], db=None, cur=None ) -> Iterable[OriginVisit]: for visit in visits: origin = self.origin_get({"url": visit.origin}, db=db, cur=cur) if not origin: # Cannot add a visit without an origin raise StorageArgumentException("Unknown origin %s", visit.origin) all_visits = [] nb_visits = 0 for visit in visits: nb_visits += 1 if not visit.visit: with convert_validation_exceptions(): visit_id = db.origin_visit_add( visit.origin, visit.date, visit.type, cur=cur ) visit = attr.evolve(visit, visit=visit_id) else: - db.origin_visit_add_with_id(visit) + db.origin_visit_add_with_id(visit, cur=cur) assert visit.visit is not None all_visits.append(visit) # Forced to write after for the case when the visit has no id self.journal_writer.origin_visit_add([visit]) visit_status = OriginVisitStatus( origin=visit.origin, visit=visit.visit, date=visit.date, status="created", snapshot=None, ) self._origin_visit_status_add(visit_status, db=db, cur=cur) send_metric("origin_visit:add", count=nb_visits, method_name="origin_visit") return all_visits def _origin_visit_status_add( self, visit_status: OriginVisitStatus, db, cur ) -> None: """Add an origin visit status""" self.journal_writer.origin_visit_status_add([visit_status]) db.origin_visit_status_add(visit_status, cur=cur) send_metric( "origin_visit_status:add", count=1, method_name="origin_visit_status" ) @timed @db_transaction() def origin_visit_status_add( self, visit_statuses: Iterable[OriginVisitStatus], db=None, cur=None, ) -> None: # First round to check existence (fail early if any is ko) for visit_status in visit_statuses: origin_url = self.origin_get({"url": visit_status.origin}, db=db, cur=cur) if not origin_url: raise StorageArgumentException(f"Unknown origin {visit_status.origin}") for visit_status in visit_statuses: self._origin_visit_status_add(visit_status, db, cur) @timed @db_transaction() def origin_visit_status_get_latest( self, origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, db=None, cur=None, ) -> Optional[OriginVisitStatus]: row = db.origin_visit_status_get_latest( origin_url, visit, allowed_statuses, require_snapshot, cur=cur ) if not row: return None return OriginVisitStatus.from_dict(row) def _origin_visit_apply_update( self, visit: Dict[str, Any], db, cur=None ) -> Dict[str, Any]: """Retrieve the latest visit status information for the origin visit. Then merge it with the visit and return it. """ visit_status = db.origin_visit_status_get_latest( visit["origin"], visit["visit"], cur=cur ) return { # default to the values in visit **visit, # override with the last update **visit_status, # visit['origin'] is the URL (via a join), while # visit_status['origin'] is only an id. "origin": visit["origin"], # but keep the date of the creation of the origin visit "date": visit["date"], } @timed @db_transaction_generator(statement_timeout=500) def origin_visit_get( self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None, order: str = "asc", db=None, cur=None, ) -> Iterable[Dict[str, Any]]: assert order in ["asc", "desc"] lines = db.origin_visit_get_all( origin, last_visit=last_visit, limit=limit, order=order, cur=cur ) for line in lines: visit = dict(zip(db.origin_visit_get_cols, line)) yield self._origin_visit_apply_update(visit, db) @timed @db_transaction(statement_timeout=500) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime, db=None, cur=None ) -> Optional[Dict[str, Any]]: visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) if visit: return self._origin_visit_apply_update(visit, db) return None @timed @db_transaction(statement_timeout=500) def origin_visit_get_by( self, origin: str, visit: int, db=None, cur=None ) -> Optional[Dict[str, Any]]: row = db.origin_visit_get(origin, visit, cur) if row: visit_dict = dict(zip(db.origin_visit_get_cols, row)) return self._origin_visit_apply_update(visit_dict, db) return None @timed @db_transaction(statement_timeout=4000) def origin_visit_get_latest( self, origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, db=None, cur=None, ) -> Optional[Dict[str, Any]]: row = db.origin_visit_get_latest( origin, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, cur=cur, ) if row: visit = dict(zip(db.origin_visit_get_cols, row)) return self._origin_visit_apply_update(visit, db) return None @timed @db_transaction() def origin_visit_get_random( self, type: str, db=None, cur=None ) -> Optional[Dict[str, Any]]: row = db.origin_visit_get_random(type, cur) if row: visit = dict(zip(db.origin_visit_get_cols, row)) return self._origin_visit_apply_update(visit, db) return None @timed @db_transaction(statement_timeout=2000) def object_find_by_sha1_git(self, ids, db=None, cur=None): ret = {id: [] for id in ids} for retval in db.object_find_by_sha1_git(ids, cur=cur): if retval[1]: ret[retval[0]].append( dict(zip(db.object_find_by_sha1_git_cols, retval)) ) return ret @timed @db_transaction(statement_timeout=500) def origin_get(self, origins, db=None, cur=None): if isinstance(origins, dict): # Old API return_single = True origins = [origins] elif len(origins) == 0: return [] else: return_single = False origin_urls = [origin["url"] for origin in origins] results = db.origin_get_by_url(origin_urls, cur) results = [dict(zip(db.origin_cols, result)) for result in results] if return_single: assert len(results) == 1 if results[0]["url"] is not None: return results[0] else: return None else: return [None if res["url"] is None else res for res in results] @timed @db_transaction_generator(statement_timeout=500) def origin_get_by_sha1(self, sha1s, db=None, cur=None): for line in db.origin_get_by_sha1(sha1s, cur): if line[0] is not None: yield dict(zip(db.origin_cols, line)) else: yield None @timed @db_transaction_generator() def origin_get_range(self, origin_from=1, origin_count=100, db=None, cur=None): for origin in db.origin_get_range(origin_from, origin_count, cur): yield dict(zip(db.origin_get_range_cols, origin)) @timed @db_transaction() def origin_list( self, page_token: Optional[str] = None, limit: int = 100, *, db=None, cur=None ) -> dict: page_token = page_token or "0" if not isinstance(page_token, str): raise StorageArgumentException("page_token must be a string.") origin_from = int(page_token) result: Dict[str, Any] = { "origins": [ dict(zip(db.origin_get_range_cols, origin)) for origin in db.origin_get_range(origin_from, limit, cur) ], } assert len(result["origins"]) <= limit if len(result["origins"]) == limit: result["next_page_token"] = str(result["origins"][limit - 1]["id"] + 1) for origin in result["origins"]: del origin["id"] return result @timed @db_transaction_generator() def origin_search( self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, db=None, cur=None, ): for origin in db.origin_search( url_pattern, offset, limit, regexp, with_visit, cur ): yield dict(zip(db.origin_cols, origin)) @timed @db_transaction() def origin_count( self, url_pattern, regexp=False, with_visit=False, db=None, cur=None ): return db.origin_count(url_pattern, regexp, with_visit, cur) @timed @db_transaction() def origin_add( self, origins: Iterable[Origin], db=None, cur=None ) -> Dict[str, int]: urls = [o.url for o in origins] known_origins = set(url for (url,) in db.origin_get_by_url(urls, cur)) # use lists here to keep origins sorted; some tests depend on this to_add = [url for url in urls if url not in known_origins] self.journal_writer.origin_add([Origin(url=url) for url in to_add]) added = 0 for url in to_add: if db.origin_add(url, cur): added += 1 return {"origin:add": added} @deprecated("Use origin_add([origin]) instead") @timed @db_transaction() def origin_add_one(self, origin: Origin, db=None, cur=None) -> str: stats = self.origin_add([origin]) if stats.get("origin:add", 0): send_metric("origin:add", count=1, method_name="origin_add_one") return origin.url @db_transaction(statement_timeout=500) def stat_counters(self, db=None, cur=None): return {k: v for (k, v) in db.stat_counters()} @db_transaction() def refresh_stat_counters(self, db=None, cur=None): keys = [ "content", "directory", "directory_entry_dir", "directory_entry_file", "directory_entry_rev", "origin", "origin_visit", "person", "release", "revision", "revision_history", "skipped_content", "snapshot", ] for key in keys: cur.execute("select * from swh_update_counter(%s)", (key,)) @timed @db_transaction() def content_metadata_add( self, id: str, context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, authority: Dict[str, Any], fetcher: Dict[str, Any], format: str, metadata: bytes, db=None, cur=None, ) -> None: self._object_metadata_add( "content", id, context, discovery_date, authority, fetcher, format, metadata, db, cur, ) @timed @db_transaction() def content_metadata_get( self, id: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, db=None, cur=None, ) -> Dict[str, Any]: return self._object_metadata_get( "content", id, authority, after, page_token, limit, db, cur ) @timed @db_transaction() def origin_metadata_add( self, origin_url: str, discovery_date: datetime.datetime, authority: Dict[str, Any], fetcher: Dict[str, Any], format: str, metadata: bytes, db=None, cur=None, ) -> None: context: Dict[str, Union[str, bytes, int]] = {} # origins have no context self._object_metadata_add( "origin", origin_url, context, discovery_date, authority, fetcher, format, metadata, db, cur, ) @timed @db_transaction(statement_timeout=500) def origin_metadata_get( self, origin_url: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, db=None, cur=None, ) -> Dict[str, Any]: result = self._object_metadata_get( "origin", origin_url, authority, after, page_token, limit, db, cur ) for res in result["results"]: res.pop("id") res["origin_url"] = origin_url return result def _object_metadata_add( self, object_type: str, id: str, context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, authority: Dict[str, Any], fetcher: Dict[str, Any], format: str, metadata: bytes, db, cur, ) -> None: check_extrinsic_metadata_context(object_type, context) authority_id = self._get_authority_id(authority, db, cur) fetcher_id = self._get_fetcher_id(fetcher, db, cur) if not isinstance(metadata, bytes): raise StorageArgumentException( "metadata must be bytes, not %r" % (metadata,) ) db.object_metadata_add( object_type, id, context, discovery_date, authority_id, fetcher_id, format, metadata, cur, ) send_metric( f"{object_type}_metadata:add", count=1, method_name=f"{object_type}_metadata_add", ) def _object_metadata_get( self, object_type: str, id: str, authority: Dict[str, str], after: Optional[datetime.datetime], page_token: Optional[bytes], limit: int, db, cur, ) -> Dict[str, Any]: if page_token: (after_time, after_fetcher) = msgpack_loads(page_token) if after and after_time < after: raise StorageArgumentException( "page_token is inconsistent with the value of 'after'." ) else: after_time = after after_fetcher = None authority_id = db.metadata_authority_get_id( authority["type"], authority["url"], cur ) if not authority_id: return { "next_page_token": None, "results": [], } rows = db.object_metadata_get( object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur ) rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] results = [] for row in rows: row = row.copy() row.pop("metadata_fetcher.id") context = {} for key in CONTEXT_KEYS[object_type]: value = row[key] if value is not None: context[key] = value result = { "id": row["id"], "authority": { "type": row.pop("metadata_authority.type"), "url": row.pop("metadata_authority.url"), }, "fetcher": { "name": row.pop("metadata_fetcher.name"), "version": row.pop("metadata_fetcher.version"), }, "discovery_date": row["discovery_date"], "format": row["format"], "metadata": row["metadata"], } if CONTEXT_KEYS[object_type]: result["context"] = context results.append(result) if len(results) > limit: results.pop() assert len(results) == limit last_returned_row = rows[-2] # rows[-1] corresponds to the popped result next_page_token: Optional[bytes] = msgpack_dumps( ( last_returned_row["discovery_date"], last_returned_row["metadata_fetcher.id"], ) ) else: next_page_token = None return { "next_page_token": next_page_token, "results": results, } @timed @db_transaction() def metadata_fetcher_add( self, name: str, version: str, metadata: Dict[str, Any], db=None, cur=None ) -> None: db.metadata_fetcher_add(name, version, metadata) send_metric("metadata_fetcher:add", count=1, method_name="metadata_fetcher") @timed @db_transaction(statement_timeout=500) def metadata_fetcher_get( self, name: str, version: str, db=None, cur=None ) -> Optional[Dict[str, Any]]: row = db.metadata_fetcher_get(name, version, cur=cur) if not row: return None return dict(zip(db.metadata_fetcher_cols, row)) @timed @db_transaction() def metadata_authority_add( self, type: str, url: str, metadata: Dict[str, Any], db=None, cur=None ) -> None: db.metadata_authority_add(type, url, metadata, cur) send_metric("metadata_authority:add", count=1, method_name="metadata_authority") @timed @db_transaction() def metadata_authority_get( self, type: str, url: str, db=None, cur=None ) -> Optional[Dict[str, Any]]: row = db.metadata_authority_get(type, url, cur=cur) if not row: return None return dict(zip(db.metadata_authority_cols, row)) @timed def diff_directories(self, from_dir, to_dir, track_renaming=False): return diff.diff_directories(self, from_dir, to_dir, track_renaming) @timed def diff_revisions(self, from_rev, to_rev, track_renaming=False): return diff.diff_revisions(self, from_rev, to_rev, track_renaming) @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: """Do nothing """ return None def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: return {} def _get_authority_id(self, authority: Dict[str, Any], db, cur): authority_id = db.metadata_authority_get_id( authority["type"], authority["url"], cur ) if not authority_id: raise StorageArgumentException(f"Unknown authority {authority}") return authority_id def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): fetcher_id = db.metadata_fetcher_get_id( fetcher["name"], fetcher["version"], cur ) if not fetcher_id: raise StorageArgumentException(f"Unknown fetcher {fetcher}") return fetcher_id