Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import select | import select | ||||
from typing import Any, Dict, Optional, Tuple | |||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.db_utils import stored_procedure, jsonize | from swh.core.db.db_utils import stored_procedure, jsonize | ||||
from swh.core.db.db_utils import execute_values_generator | from swh.core.db.db_utils import execute_values_generator | ||||
from swh.model.model import OriginVisit, SHA1_SIZE | from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE | ||||
class Db(BaseDb): | class Db(BaseDb): | ||||
"""Proxy to the SWH DB, with wrappers around stored procedures | """Proxy to the SWH DB, with wrappers around stored procedures | ||||
""" | """ | ||||
def mktemp_dir_entry(self, entry_type, cur=None): | def mktemp_dir_entry(self, entry_type, cur=None): | ||||
▲ Show 20 Lines • Show All 232 Lines • ▼ Show 20 Lines | ): | ||||
cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) | cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) | ||||
yield from cur | yield from cur | ||||
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT snapshot FROM origin_visit | SELECT ovs.snapshot | ||||
INNER JOIN origin ON origin.id = origin_visit.origin | FROM origin_visit ov | ||||
WHERE origin.url=%s AND origin_visit.visit=%s; | INNER JOIN origin o ON o.id = ov.origin | ||||
INNER JOIN origin_visit_status ovs | |||||
ON ov.origin = ovs.origin AND ov.visit = ovs.visit | |||||
WHERE o.url=%s AND ov.visit=%s | |||||
ORDER BY ovs.date DESC LIMIT 1 | |||||
""" | """ | ||||
cur.execute(query, (origin_url, visit_id)) | cur.execute(query, (origin_url, visit_id)) | ||||
ret = cur.fetchone() | ret = cur.fetchone() | ||||
if ret: | if ret: | ||||
return ret[0] | return ret[0] | ||||
def snapshot_get_random(self, cur=None): | def snapshot_get_random(self, cur=None): | ||||
▲ Show 20 Lines • Show All 163 Lines • ▼ Show 20 Lines | def origin_visit_add(self, origin, ts, type, cur=None): | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
self._cursor(cur).execute( | self._cursor(cur).execute( | ||||
"SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) | "SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) | ||||
) | ) | ||||
return cur.fetchone()[0] | return cur.fetchone()[0] | ||||
def origin_visit_update(self, origin_id, visit_id, updates, cur=None): | origin_visit_status_cols = [ | ||||
"""Update origin_visit's status.""" | "origin", | ||||
"visit", | |||||
"date", | |||||
"status", | |||||
"snapshot", | |||||
"metadata", | |||||
] | |||||
def origin_visit_status_add( | |||||
self, visit_status: OriginVisitStatus, cur=None | |||||
) -> None: | |||||
assert self.origin_visit_status_cols[0] == "origin" | |||||
assert self.origin_visit_status_cols[-1] == "metadata" | |||||
cols = self.origin_visit_status_cols[1:-1] | |||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
update_cols = [] | cur.execute( | ||||
values = [] | f"WITH origin_id as (select id from origin where url=%s) " | ||||
where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"] | f"INSERT INTO origin_visit_status " | ||||
where_values = [origin_id, visit_id] | f"(origin, {', '.join(cols)}, metadata) " | ||||
if "status" in updates: | f"VALUES ((select id from origin_id), " | ||||
update_cols.append("status=%s") | f"{', '.join(['%s']*len(cols))}, %s) " | ||||
values.append(updates.pop("status")) | f"ON CONFLICT (origin, visit, date) do nothing", | ||||
if "metadata" in updates: | [visit_status.origin] | ||||
update_cols.append("metadata=%s") | + [getattr(visit_status, key) for key in cols] | ||||
values.append(jsonize(updates.pop("metadata"))) | + [jsonize(visit_status.metadata)], | ||||
if "snapshot" in updates: | |||||
update_cols.append("snapshot=%s") | |||||
values.append(updates.pop("snapshot")) | |||||
assert not updates, "Unknown fields: %r" % updates | |||||
query = """UPDATE origin_visit | |||||
SET {update_cols} | |||||
FROM origin | |||||
WHERE {where}""".format( | |||||
**{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)} | |||||
) | ) | ||||
cur.execute(query, (*values, *where_values)) | |||||
def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | ||||
# doing an extra query like this is way simpler than trying to join | # doing an extra query like this is way simpler than trying to join | ||||
# the origin id in the query below | # the origin id in the query below | ||||
ov = origin_visit | ov = origin_visit | ||||
origin_id = next(self.origin_id_get_by_url([ov.origin])) | origin_id = next(self.origin_id_get_by_url([ov.origin])) | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
Show All 24 Lines | origin_visit_get_cols = [ | ||||
"visit", | "visit", | ||||
"date", | "date", | ||||
"type", | "type", | ||||
"status", | "status", | ||||
"metadata", | "metadata", | ||||
"snapshot", | "snapshot", | ||||
] | ] | ||||
origin_visit_select_cols = [ | origin_visit_select_cols = [ | ||||
"origin.url AS origin", | "o.url AS origin", | ||||
"visit", | "ov.visit", | ||||
"date", | "ov.date", | ||||
"origin_visit.type AS type", | "ov.type AS type", | ||||
"status", | "ovs.status", | ||||
"metadata", | "ovs.metadata", | ||||
"snapshot", | "ovs.snapshot", | ||||
] | ] | ||||
def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: | |||||
"""Make an origin_visit_status dict out of a row | |||||
""" | |||||
if not row: | |||||
return None | |||||
return dict(zip(self.origin_visit_status_cols, row)) | |||||
def origin_visit_status_get_latest( | |||||
self, origin: str, visit: int, cur=None | |||||
) -> Optional[Dict[str, Any]]: | |||||
"""Given an origin visit id, return its latest origin_visit_status | |||||
""" | |||||
cols = self.origin_visit_status_cols | |||||
cur = self._cursor(cur) | |||||
cur.execute( | |||||
f"SELECT {', '.join(cols)} " | |||||
f"FROM origin_visit_status ovs " | |||||
f"INNER JOIN origin o on o.id=ovs.origin " | |||||
f"WHERE o.url=%s AND ovs.visit=%s" | |||||
f"ORDER BY ovs.date DESC LIMIT 1", | |||||
(origin, visit), | |||||
) | |||||
row = cur.fetchone() | |||||
return self._make_origin_visit_status(row) | |||||
def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): | def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): | ||||
"""Retrieve all visits for origin with id origin_id. | """Retrieve all visits for origin with id origin_id. | ||||
Args: | Args: | ||||
origin_id: The occurrence's origin | origin_id: The occurrence's origin | ||||
Yields: | Yields: | ||||
The occurrence's history visits | The visits for that origin | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if last_visit: | if last_visit: | ||||
extra_condition = "and visit > %s" | extra_condition = "and ov.visit > %s" | ||||
args = (origin_id, last_visit, limit) | args = (origin_id, last_visit, limit) | ||||
else: | else: | ||||
extra_condition = "" | extra_condition = "" | ||||
args = (origin_id, limit) | args = (origin_id, limit) | ||||
query = """\ | query = """\ | ||||
SELECT %s | SELECT DISTINCT ON (ov.visit) %s | ||||
FROM origin_visit | FROM origin_visit ov | ||||
INNER JOIN origin ON origin.id = origin_visit.origin | INNER JOIN origin o ON o.id = ov.origin | ||||
WHERE origin.url=%%s %s | INNER JOIN origin_visit_status ovs | ||||
order by visit asc | ON ov.origin = ovs.origin AND ov.visit = ovs.visit | ||||
limit %%s""" % ( | WHERE o.url=%%s %s | ||||
ORDER BY ov.visit ASC, ovs.date DESC | |||||
LIMIT %%s""" % ( | |||||
", ".join(self.origin_visit_select_cols), | ", ".join(self.origin_visit_select_cols), | ||||
extra_condition, | extra_condition, | ||||
) | ) | ||||
cur.execute(query, args) | cur.execute(query, args) | ||||
yield from cur | yield from cur | ||||
def origin_visit_get(self, origin_id, visit_id, cur=None): | def origin_visit_get(self, origin_id, visit_id, cur=None): | ||||
"""Retrieve information on visit visit_id of origin origin_id. | """Retrieve information on visit visit_id of origin origin_id. | ||||
Args: | Args: | ||||
origin_id: the origin concerned | origin_id: the origin concerned | ||||
visit_id: The visit step for that origin | visit_id: The visit step for that origin | ||||
Returns: | Returns: | ||||
The origin_visit information | The origin_visit information | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT %s | SELECT %s | ||||
FROM origin_visit | FROM origin_visit ov | ||||
INNER JOIN origin ON origin.id = origin_visit.origin | INNER JOIN origin o ON o.id = ov.origin | ||||
WHERE origin.url = %%s AND visit = %%s | INNER JOIN origin_visit_status ovs | ||||
ON ov.origin = ovs.origin AND ov.visit = ovs.visit | |||||
WHERE o.url = %%s AND ov.visit = %%s | |||||
ORDER BY ovs.date DESC | |||||
LIMIT 1 | |||||
""" % ( | """ % ( | ||||
", ".join(self.origin_visit_select_cols) | ", ".join(self.origin_visit_select_cols) | ||||
) | ) | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
r = cur.fetchall() | r = cur.fetchall() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return r[0] | return r[0] | ||||
def origin_visit_find_by_date(self, origin, visit_date, cur=None): | def origin_visit_find_by_date(self, origin, visit_date, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
"SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) | "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) | ||||
) | ) | ||||
r = cur.fetchall() | rows = cur.fetchall() | ||||
if r: | if rows: | ||||
return r[0] | visit = dict(zip(self.origin_visit_get_cols, rows[0])) | ||||
visit["origin"] = origin | |||||
return visit | |||||
def origin_visit_exists(self, origin_id, visit_id, cur=None): | def origin_visit_exists(self, origin_id, visit_id, cur=None): | ||||
"""Check whether an origin visit with the given ids exists""" | """Check whether an origin visit with the given ids exists""" | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
Show All 14 Lines | ): | ||||
Returns: | Returns: | ||||
The origin_visit information, or None if no visit matches. | The origin_visit information, or None if no visit matches. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query_parts = [ | query_parts = [ | ||||
"SELECT %s" % ", ".join(self.origin_visit_select_cols), | "SELECT %s" % ", ".join(self.origin_visit_select_cols), | ||||
"FROM origin_visit", | "FROM origin_visit ov ", | ||||
"INNER JOIN origin ON origin.id = origin_visit.origin", | "INNER JOIN origin o ON o.id = ov.origin", | ||||
"INNER JOIN origin_visit_status ovs ", | |||||
"ON o.id = ovs.origin AND ov.visit = ovs.visit ", | |||||
] | ] | ||||
query_parts.append("WHERE origin.url = %s") | query_parts.append("WHERE o.url = %s") | ||||
if require_snapshot: | if require_snapshot: | ||||
query_parts.append("AND snapshot is not null") | query_parts.append("AND ovs.snapshot is not null") | ||||
if allowed_statuses: | if allowed_statuses: | ||||
query_parts.append( | query_parts.append( | ||||
cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode() | cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode() | ||||
) | ) | ||||
query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1") | query_parts.append( | ||||
"ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" | |||||
) | |||||
query = "\n".join(query_parts) | query = "\n".join(query_parts) | ||||
cur.execute(query, (origin_id,)) | cur.execute(query, (origin_id,)) | ||||
r = cur.fetchone() | r = cur.fetchone() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return r | return r | ||||
def origin_visit_get_random(self, type, cur=None): | def origin_visit_get_random(self, type, cur=None): | ||||
"""Randomly select one origin visit that was full and in the last 3 | """Randomly select one origin visit that was full and in the last 3 | ||||
months | months | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
columns = ",".join(self.origin_visit_select_cols) | columns = ",".join(self.origin_visit_select_cols) | ||||
query = f"""with visits as ( | query = f"""select {columns} | ||||
select * | from origin_visit ov | ||||
from origin_visit | inner join origin o on ov.origin=o.id | ||||
where origin_visit.status='full' and | inner join origin_visit_status ovs | ||||
origin_visit.type=%s and | on ov.origin = ovs.origin and ov.visit = ovs.visit | ||||
origin_visit.date > now() - '3 months'::interval | where ovs.status='full' | ||||
) | and ov.type=%s | ||||
select {columns} | and ov.date > now() - '3 months'::interval | ||||
from visits as origin_visit | and random() < 0.1 | ||||
inner join origin | |||||
on origin_visit.origin=origin.id | |||||
where random() < 0.1 | |||||
limit 1 | limit 1 | ||||
""" | """ | ||||
cur.execute(query, (type,)) | cur.execute(query, (type,)) | ||||
return cur.fetchone() | return cur.fetchone() | ||||
@staticmethod | @staticmethod | ||||
def mangle_query_key(key, main_table): | def mangle_query_key(key, main_table): | ||||
if key == "id": | if key == "id": | ||||
▲ Show 20 Lines • Show All 217 Lines • ▼ Show 20 Lines | ): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if count: | if count: | ||||
origin_cols = "COUNT(*)" | origin_cols = "COUNT(*)" | ||||
else: | else: | ||||
origin_cols = ",".join(self.origin_cols) | origin_cols = ",".join(self.origin_cols) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM origin | FROM origin o | ||||
WHERE """ | WHERE """ | ||||
if with_visit: | if with_visit: | ||||
query += """ | query += """ | ||||
EXISTS ( | EXISTS ( | ||||
SELECT 1 | SELECT 1 | ||||
FROM origin_visit | FROM origin_visit ov | ||||
INNER JOIN snapshot ON snapshot=snapshot.id | INNER JOIN origin_visit_status ovs | ||||
WHERE origin=origin.id | ON ov.origin = ovs.origin AND ov.visit = ovs.visit | ||||
INNER JOIN snapshot ON ovs.snapshot=snapshot.id | |||||
WHERE ov.origin=o.id | |||||
) | ) | ||||
AND """ | AND """ | ||||
query += "url %s %%s " | query += "url %s %%s " | ||||
if not count: | if not count: | ||||
query += "ORDER BY id OFFSET %%s LIMIT %%s" | query += "ORDER BY id OFFSET %%s LIMIT %%s" | ||||
if not regexp: | if not regexp: | ||||
query = query % (origin_cols, "ILIKE") | query = query % (origin_cols, "ILIKE") | ||||
▲ Show 20 Lines • Show All 237 Lines • Show Last 20 Lines |