Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
| # Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
| import random | import random | ||||
| import select | import select | ||||
| from typing import Any, Dict, Optional, Tuple | |||||
| from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
| from swh.core.db.db_utils import stored_procedure, jsonize | from swh.core.db.db_utils import stored_procedure, jsonize | ||||
| from swh.core.db.db_utils import execute_values_generator | from swh.core.db.db_utils import execute_values_generator | ||||
| from swh.model.model import OriginVisit, SHA1_SIZE | from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE | ||||
| class Db(BaseDb): | class Db(BaseDb): | ||||
| """Proxy to the SWH DB, with wrappers around stored procedures | """Proxy to the SWH DB, with wrappers around stored procedures | ||||
| """ | """ | ||||
| def mktemp_dir_entry(self, entry_type, cur=None): | def mktemp_dir_entry(self, entry_type, cur=None): | ||||
| ▲ Show 20 Lines • Show All 232 Lines • ▼ Show 20 Lines | ): | ||||
| cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) | cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) | ||||
| yield from cur | yield from cur | ||||
| def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| query = """\ | query = """\ | ||||
| SELECT snapshot FROM origin_visit | SELECT ovs.snapshot | ||||
| INNER JOIN origin ON origin.id = origin_visit.origin | FROM origin_visit ov | ||||
| WHERE origin.url=%s AND origin_visit.visit=%s; | INNER JOIN origin o ON o.id = ov.origin | ||||
| INNER JOIN origin_visit_status ovs | |||||
| ON ov.origin = ovs.origin AND ov.visit = ovs.visit | |||||
| WHERE o.url=%s AND ov.visit=%s | |||||
| ORDER BY ovs.date DESC LIMIT 1 | |||||
| """ | """ | ||||
| cur.execute(query, (origin_url, visit_id)) | cur.execute(query, (origin_url, visit_id)) | ||||
| ret = cur.fetchone() | ret = cur.fetchone() | ||||
| if ret: | if ret: | ||||
| return ret[0] | return ret[0] | ||||
| def snapshot_get_random(self, cur=None): | def snapshot_get_random(self, cur=None): | ||||
| ▲ Show 20 Lines • Show All 163 Lines • ▼ Show 20 Lines | def origin_visit_add(self, origin, ts, type, cur=None): | ||||
| """ | """ | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| self._cursor(cur).execute( | self._cursor(cur).execute( | ||||
| "SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) | "SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) | ||||
| ) | ) | ||||
| return cur.fetchone()[0] | return cur.fetchone()[0] | ||||
| def origin_visit_update(self, origin_id, visit_id, updates, cur=None): | origin_visit_status_cols = [ | ||||
| """Update origin_visit's status.""" | "origin", | ||||
| "visit", | |||||
| "date", | |||||
| "status", | |||||
| "snapshot", | |||||
| "metadata", | |||||
| ] | |||||
| def origin_visit_status_add( | |||||
| self, visit_status: OriginVisitStatus, cur=None | |||||
| ) -> None: | |||||
| assert self.origin_visit_status_cols[0] == "origin" | |||||
| assert self.origin_visit_status_cols[-1] == "metadata" | |||||
| cols = self.origin_visit_status_cols[1:-1] | |||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| update_cols = [] | cur.execute( | ||||
| values = [] | f"WITH origin_id as (select id from origin where url=%s) " | ||||
| where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"] | f"INSERT INTO origin_visit_status " | ||||
| where_values = [origin_id, visit_id] | f"(origin, {', '.join(cols)}, metadata) " | ||||
| if "status" in updates: | f"VALUES ((select id from origin_id), " | ||||
| update_cols.append("status=%s") | f"{', '.join(['%s']*len(cols))}, %s) " | ||||
| values.append(updates.pop("status")) | f"ON CONFLICT (origin, visit, date) do nothing", | ||||
| if "metadata" in updates: | [visit_status.origin] | ||||
| update_cols.append("metadata=%s") | + [getattr(visit_status, key) for key in cols] | ||||
| values.append(jsonize(updates.pop("metadata"))) | + [jsonize(visit_status.metadata)], | ||||
| if "snapshot" in updates: | |||||
| update_cols.append("snapshot=%s") | |||||
| values.append(updates.pop("snapshot")) | |||||
| assert not updates, "Unknown fields: %r" % updates | |||||
| query = """UPDATE origin_visit | |||||
| SET {update_cols} | |||||
| FROM origin | |||||
| WHERE {where}""".format( | |||||
| **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)} | |||||
| ) | ) | ||||
| cur.execute(query, (*values, *where_values)) | |||||
| def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | ||||
| # doing an extra query like this is way simpler than trying to join | # doing an extra query like this is way simpler than trying to join | ||||
| # the origin id in the query below | # the origin id in the query below | ||||
| ov = origin_visit | ov = origin_visit | ||||
| origin_id = next(self.origin_id_get_by_url([ov.origin])) | origin_id = next(self.origin_id_get_by_url([ov.origin])) | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| Show All 24 Lines | origin_visit_get_cols = [ | ||||
| "visit", | "visit", | ||||
| "date", | "date", | ||||
| "type", | "type", | ||||
| "status", | "status", | ||||
| "metadata", | "metadata", | ||||
| "snapshot", | "snapshot", | ||||
| ] | ] | ||||
| origin_visit_select_cols = [ | origin_visit_select_cols = [ | ||||
| "origin.url AS origin", | "o.url AS origin", | ||||
| "visit", | "ov.visit", | ||||
| "date", | "ov.date", | ||||
| "origin_visit.type AS type", | "ov.type AS type", | ||||
| "status", | "ovs.status", | ||||
| "metadata", | "ovs.metadata", | ||||
| "snapshot", | "ovs.snapshot", | ||||
| ] | ] | ||||
| def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: | |||||
| """Make an origin_visit_status dict out of a row | |||||
| """ | |||||
| if not row: | |||||
| return None | |||||
| return dict(zip(self.origin_visit_status_cols, row)) | |||||
| def origin_visit_status_get_latest( | |||||
| self, origin: str, visit: int, cur=None | |||||
| ) -> Optional[Dict[str, Any]]: | |||||
| """Given an origin visit id, return its latest origin_visit_status | |||||
| """ | |||||
| cols = self.origin_visit_status_cols | |||||
| cur = self._cursor(cur) | |||||
| cur.execute( | |||||
| f"SELECT {', '.join(cols)} " | |||||
| f"FROM origin_visit_status ovs " | |||||
| f"INNER JOIN origin o on o.id=ovs.origin " | |||||
| f"WHERE o.url=%s AND ovs.visit=%s" | |||||
| f"ORDER BY ovs.date DESC LIMIT 1", | |||||
| (origin, visit), | |||||
| ) | |||||
| row = cur.fetchone() | |||||
| return self._make_origin_visit_status(row) | |||||
| def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): | def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): | ||||
| """Retrieve all visits for origin with id origin_id. | """Retrieve all visits for origin with id origin_id. | ||||
| Args: | Args: | ||||
| origin_id: The occurrence's origin | origin_id: The occurrence's origin | ||||
| Yields: | Yields: | ||||
| The occurrence's history visits | The visits for that origin | ||||
| """ | """ | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| if last_visit: | if last_visit: | ||||
| extra_condition = "and visit > %s" | extra_condition = "and ov.visit > %s" | ||||
| args = (origin_id, last_visit, limit) | args = (origin_id, last_visit, limit) | ||||
| else: | else: | ||||
| extra_condition = "" | extra_condition = "" | ||||
| args = (origin_id, limit) | args = (origin_id, limit) | ||||
| query = """\ | query = """\ | ||||
| SELECT %s | SELECT DISTINCT ON (ov.visit) %s | ||||
| FROM origin_visit | FROM origin_visit ov | ||||
| INNER JOIN origin ON origin.id = origin_visit.origin | INNER JOIN origin o ON o.id = ov.origin | ||||
| WHERE origin.url=%%s %s | INNER JOIN origin_visit_status ovs | ||||
| order by visit asc | ON ov.origin = ovs.origin AND ov.visit = ovs.visit | ||||
| limit %%s""" % ( | WHERE o.url=%%s %s | ||||
| ORDER BY ov.visit ASC, ovs.date DESC | |||||
| LIMIT %%s""" % ( | |||||
| ", ".join(self.origin_visit_select_cols), | ", ".join(self.origin_visit_select_cols), | ||||
| extra_condition, | extra_condition, | ||||
| ) | ) | ||||
| cur.execute(query, args) | cur.execute(query, args) | ||||
| yield from cur | yield from cur | ||||
| def origin_visit_get(self, origin_id, visit_id, cur=None): | def origin_visit_get(self, origin_id, visit_id, cur=None): | ||||
| """Retrieve information on visit visit_id of origin origin_id. | """Retrieve information on visit visit_id of origin origin_id. | ||||
| Args: | Args: | ||||
| origin_id: the origin concerned | origin_id: the origin concerned | ||||
| visit_id: The visit step for that origin | visit_id: The visit step for that origin | ||||
| Returns: | Returns: | ||||
| The origin_visit information | The origin_visit information | ||||
| """ | """ | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| query = """\ | query = """\ | ||||
| SELECT %s | SELECT %s | ||||
| FROM origin_visit | FROM origin_visit ov | ||||
| INNER JOIN origin ON origin.id = origin_visit.origin | INNER JOIN origin o ON o.id = ov.origin | ||||
| WHERE origin.url = %%s AND visit = %%s | INNER JOIN origin_visit_status ovs | ||||
| ON ov.origin = ovs.origin AND ov.visit = ovs.visit | |||||
| WHERE o.url = %%s AND ov.visit = %%s | |||||
| ORDER BY ovs.date DESC | |||||
| LIMIT 1 | |||||
| """ % ( | """ % ( | ||||
| ", ".join(self.origin_visit_select_cols) | ", ".join(self.origin_visit_select_cols) | ||||
| ) | ) | ||||
| cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
| r = cur.fetchall() | r = cur.fetchall() | ||||
| if not r: | if not r: | ||||
| return None | return None | ||||
| return r[0] | return r[0] | ||||
| def origin_visit_find_by_date(self, origin, visit_date, cur=None): | def origin_visit_find_by_date(self, origin, visit_date, cur=None): | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| cur.execute( | cur.execute( | ||||
| "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) | "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) | ||||
| ) | ) | ||||
| r = cur.fetchall() | rows = cur.fetchall() | ||||
| if r: | if rows: | ||||
| return r[0] | visit = dict(zip(self.origin_visit_get_cols, rows[0])) | ||||
| visit["origin"] = origin | |||||
| return visit | |||||
| def origin_visit_exists(self, origin_id, visit_id, cur=None): | def origin_visit_exists(self, origin_id, visit_id, cur=None): | ||||
| """Check whether an origin visit with the given ids exists""" | """Check whether an origin visit with the given ids exists""" | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | ||||
| cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
| Show All 14 Lines | ): | ||||
| Returns: | Returns: | ||||
| The origin_visit information, or None if no visit matches. | The origin_visit information, or None if no visit matches. | ||||
| """ | """ | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| query_parts = [ | query_parts = [ | ||||
| "SELECT %s" % ", ".join(self.origin_visit_select_cols), | "SELECT %s" % ", ".join(self.origin_visit_select_cols), | ||||
| "FROM origin_visit", | "FROM origin_visit ov ", | ||||
| "INNER JOIN origin ON origin.id = origin_visit.origin", | "INNER JOIN origin o ON o.id = ov.origin", | ||||
| "INNER JOIN origin_visit_status ovs ", | |||||
| "ON o.id = ovs.origin AND ov.visit = ovs.visit ", | |||||
| ] | ] | ||||
| query_parts.append("WHERE origin.url = %s") | query_parts.append("WHERE o.url = %s") | ||||
| if require_snapshot: | if require_snapshot: | ||||
| query_parts.append("AND snapshot is not null") | query_parts.append("AND ovs.snapshot is not null") | ||||
| if allowed_statuses: | if allowed_statuses: | ||||
| query_parts.append( | query_parts.append( | ||||
| cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode() | cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode() | ||||
| ) | ) | ||||
| query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1") | query_parts.append( | ||||
| "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" | |||||
| ) | |||||
| query = "\n".join(query_parts) | query = "\n".join(query_parts) | ||||
| cur.execute(query, (origin_id,)) | cur.execute(query, (origin_id,)) | ||||
| r = cur.fetchone() | r = cur.fetchone() | ||||
| if not r: | if not r: | ||||
| return None | return None | ||||
| return r | return r | ||||
| def origin_visit_get_random(self, type, cur=None): | def origin_visit_get_random(self, type, cur=None): | ||||
| """Randomly select one origin visit that was full and in the last 3 | """Randomly select one origin visit that was full and in the last 3 | ||||
| months | months | ||||
| """ | """ | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| columns = ",".join(self.origin_visit_select_cols) | columns = ",".join(self.origin_visit_select_cols) | ||||
| query = f"""with visits as ( | query = f"""select {columns} | ||||
| select * | from origin_visit ov | ||||
| from origin_visit | inner join origin o on ov.origin=o.id | ||||
| where origin_visit.status='full' and | inner join origin_visit_status ovs | ||||
| origin_visit.type=%s and | on ov.origin = ovs.origin and ov.visit = ovs.visit | ||||
| origin_visit.date > now() - '3 months'::interval | where ovs.status='full' | ||||
| ) | and ov.type=%s | ||||
| select {columns} | and ov.date > now() - '3 months'::interval | ||||
| from visits as origin_visit | and random() < 0.1 | ||||
| inner join origin | |||||
| on origin_visit.origin=origin.id | |||||
| where random() < 0.1 | |||||
| limit 1 | limit 1 | ||||
| """ | """ | ||||
| cur.execute(query, (type,)) | cur.execute(query, (type,)) | ||||
| return cur.fetchone() | return cur.fetchone() | ||||
| @staticmethod | @staticmethod | ||||
| def mangle_query_key(key, main_table): | def mangle_query_key(key, main_table): | ||||
| if key == "id": | if key == "id": | ||||
| ▲ Show 20 Lines • Show All 217 Lines • ▼ Show 20 Lines | ): | ||||
| cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
| if count: | if count: | ||||
| origin_cols = "COUNT(*)" | origin_cols = "COUNT(*)" | ||||
| else: | else: | ||||
| origin_cols = ",".join(self.origin_cols) | origin_cols = ",".join(self.origin_cols) | ||||
| query = """SELECT %s | query = """SELECT %s | ||||
| FROM origin | FROM origin o | ||||
| WHERE """ | WHERE """ | ||||
| if with_visit: | if with_visit: | ||||
| query += """ | query += """ | ||||
| EXISTS ( | EXISTS ( | ||||
| SELECT 1 | SELECT 1 | ||||
| FROM origin_visit | FROM origin_visit ov | ||||
| INNER JOIN snapshot ON snapshot=snapshot.id | INNER JOIN origin_visit_status ovs | ||||
| WHERE origin=origin.id | ON ov.origin = ovs.origin AND ov.visit = ovs.visit | ||||
| INNER JOIN snapshot ON ovs.snapshot=snapshot.id | |||||
| WHERE ov.origin=o.id | |||||
| ) | ) | ||||
| AND """ | AND """ | ||||
| query += "url %s %%s " | query += "url %s %%s " | ||||
| if not count: | if not count: | ||||
| query += "ORDER BY id OFFSET %%s LIMIT %%s" | query += "ORDER BY id OFFSET %%s LIMIT %%s" | ||||
| if not regexp: | if not regexp: | ||||
| query = query % (origin_cols, "ILIKE") | query = query % (origin_cols, "ILIKE") | ||||
| ▲ Show 20 Lines • Show All 237 Lines • Show Last 20 Lines | |||||