Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import select | import select | ||||
from typing import Any, Dict, Optional, Tuple | |||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.db_utils import stored_procedure, jsonize | from swh.core.db.db_utils import stored_procedure, jsonize | ||||
from swh.core.db.db_utils import execute_values_generator | from swh.core.db.db_utils import execute_values_generator | ||||
from swh.model.model import OriginVisit, SHA1_SIZE | from swh.model.model import OriginVisit, OriginVisitUpdate, SHA1_SIZE | ||||
class Db(BaseDb): | class Db(BaseDb): | ||||
"""Proxy to the SWH DB, with wrappers around stored procedures | """Proxy to the SWH DB, with wrappers around stored procedures | ||||
""" | """ | ||||
def mktemp_dir_entry(self, entry_type, cur=None): | def mktemp_dir_entry(self, entry_type, cur=None): | ||||
▲ Show 20 Lines • Show All 183 Lines • ▼ Show 20 Lines | def snapshot_get_by_id(self, snapshot_id, branches_from=b'', | ||||
cur.execute(query, (snapshot_id, branches_from, branches_count, | cur.execute(query, (snapshot_id, branches_from, branches_count, | ||||
target_types)) | target_types)) | ||||
yield from cur | yield from cur | ||||
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT snapshot FROM origin_visit | SELECT ovu.snapshot | ||||
INNER JOIN origin ON origin.id = origin_visit.origin | FROM origin_visit ov | ||||
WHERE origin.url=%s AND origin_visit.visit=%s; | INNER JOIN origin o ON o.id = ov.origin | ||||
INNER JOIN origin_visit_update ovu | |||||
ON ov.origin = ovu.origin AND ov.visit = ovu.visit | |||||
WHERE o.url=%s AND ov.visit=%s | |||||
ORDER BY ovu.date DESC LIMIT 1 | |||||
""" | """ | ||||
cur.execute(query, (origin_url, visit_id)) | cur.execute(query, (origin_url, visit_id)) | ||||
ret = cur.fetchone() | ret = cur.fetchone() | ||||
if ret: | if ret: | ||||
return ret[0] | return ret[0] | ||||
def snapshot_get_random(self, cur=None): | def snapshot_get_random(self, cur=None): | ||||
▲ Show 20 Lines • Show All 123 Lines • ▼ Show 20 Lines | def origin_visit_add(self, origin, ts, type, cur=None): | ||||
The new visit index step for that origin | The new visit index step for that origin | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
self._cursor(cur).execute('SELECT swh_origin_visit_add(%s, %s, %s)', | self._cursor(cur).execute('SELECT swh_origin_visit_add(%s, %s, %s)', | ||||
(origin, ts, type)) | (origin, ts, type)) | ||||
return cur.fetchone()[0] | return cur.fetchone()[0] | ||||
def origin_visit_update(self, origin_id, visit_id, updates, cur=None): | origin_visit_update_cols = [ | ||||
"""Update origin_visit's status.""" | 'origin', 'visit', 'date', 'status', 'snapshot', 'metadata'] | ||||
def origin_visit_update_add( | |||||
self, visit_update: OriginVisitUpdate, cur=None) -> None: | |||||
assert self.origin_visit_update_cols[0] == 'origin' | |||||
assert self.origin_visit_update_cols[-1] == 'metadata' | |||||
cols = self.origin_visit_update_cols[1:-1] | |||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
update_cols = [] | cur.execute( | ||||
values = [] | f"WITH origin_id as (select id from origin where url=%s) " | ||||
where = ['origin.id = origin_visit.origin', | f"INSERT INTO origin_visit_update " | ||||
'origin.url=%s', | f"(origin, {', '.join(cols)}, metadata) " | ||||
'visit=%s'] | f"VALUES ((select id from origin_id), " | ||||
where_values = [origin_id, visit_id] | f"{', '.join(['%s']*len(cols))}, %s) " | ||||
if 'status' in updates: | f"ON CONFLICT (origin, visit, date) do nothing", | ||||
update_cols.append('status=%s') | [visit_update.origin] | ||||
values.append(updates.pop('status')) | + [getattr(visit_update, key) for key in cols] | ||||
if 'metadata' in updates: | + [jsonize(visit_update.metadata)]) | ||||
update_cols.append('metadata=%s') | |||||
values.append(jsonize(updates.pop('metadata'))) | |||||
if 'snapshot' in updates: | |||||
update_cols.append('snapshot=%s') | |||||
values.append(updates.pop('snapshot')) | |||||
assert not updates, 'Unknown fields: %r' % updates | |||||
query = """UPDATE origin_visit | |||||
SET {update_cols} | |||||
FROM origin | |||||
WHERE {where}""".format(**{ | |||||
'update_cols': ', '.join(update_cols), | |||||
'where': ' AND '.join(where) | |||||
}) | |||||
cur.execute(query, (*values, *where_values)) | |||||
def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | ||||
# doing an extra query like this is way simpler than trying to join | # doing an extra query like this is way simpler than trying to join | ||||
# the origin id in the query below | # the origin id in the query below | ||||
ov = origin_visit | ov = origin_visit | ||||
origin_id = next(self.origin_id_get_by_url([ov.origin])) | origin_id = next(self.origin_id_get_by_url([ov.origin])) | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """INSERT INTO origin_visit ({cols}) VALUES ({values}) | query = """INSERT INTO origin_visit ({cols}) VALUES ({values}) | ||||
ON CONFLICT ON CONSTRAINT origin_visit_pkey DO | ON CONFLICT ON CONSTRAINT origin_visit_pkey DO | ||||
UPDATE SET {updates}""".format( | UPDATE SET {updates}""".format( | ||||
cols=', '.join(self.origin_visit_get_cols), | cols=', '.join(self.origin_visit_get_cols), | ||||
values=', '.join('%s' for col in self.origin_visit_get_cols), | values=', '.join('%s' for col in self.origin_visit_get_cols), | ||||
updates=', '.join('{0}=excluded.{0}'.format(col) | updates=', '.join('{0}=excluded.{0}'.format(col) | ||||
for col in self.origin_visit_get_cols)) | for col in self.origin_visit_get_cols)) | ||||
cur.execute( | cur.execute( | ||||
query, (origin_id, ov.visit, ov.date, ov.type, ov.status, | query, (origin_id, ov.visit, ov.date, ov.type, ov.status, | ||||
ov.metadata, ov.snapshot)) | ov.metadata, ov.snapshot)) | ||||
origin_visit_get_cols = [ | origin_visit_get_cols = [ | ||||
'origin', 'visit', 'date', 'type', | 'origin', 'visit', 'date', 'type', | ||||
'status', 'metadata', 'snapshot'] | 'status', 'metadata', 'snapshot'] | ||||
origin_visit_select_cols = [ | origin_visit_select_cols = [ | ||||
'origin.url AS origin', 'visit', 'date', 'origin_visit.type AS type', | 'o.url AS origin', 'ov.visit', 'ov.date', 'ov.type AS type', | ||||
'status', 'metadata', 'snapshot'] | 'ovu.status', 'ovu.metadata', 'ovu.snapshot'] | ||||
def _make_origin_visit_update( | |||||
self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: | |||||
"""Make an origin_visit_update dict out of a row | |||||
""" | |||||
if not row: | |||||
return None | |||||
return dict(zip(self.origin_visit_update_cols, row)) | |||||
def origin_visit_update_get_latest( | |||||
self, origin: str, visit: int, | |||||
cur=None) -> Optional[Dict[str, Any]]: | |||||
"""Given an origin visit id, return its latest origin_visit_update | |||||
""" | |||||
cols = self.origin_visit_update_cols | |||||
cur = self._cursor(cur) | |||||
cur.execute(f"SELECT {', '.join(cols)} " | |||||
f"FROM origin_visit_update ovu " | |||||
f"INNER JOIN origin o on o.id=ovu.origin " | |||||
f"WHERE o.url=%s AND ovu.visit=%s" | |||||
f"ORDER BY ovu.date DESC LIMIT 1", | |||||
(origin, visit)) | |||||
row = cur.fetchone() | |||||
return self._make_origin_visit_update(row) | |||||
def origin_visit_get_all(self, origin_id, | def origin_visit_get_all(self, origin_id, | ||||
last_visit=None, limit=None, cur=None): | last_visit=None, limit=None, cur=None): | ||||
"""Retrieve all visits for origin with id origin_id. | """Retrieve all visits for origin with id origin_id. | ||||
Args: | Args: | ||||
origin_id: The occurrence's origin | origin_id: The occurrence's origin | ||||
Yields: | Yields: | ||||
The occurrence's history visits | The visits for that origin | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if last_visit: | if last_visit: | ||||
extra_condition = 'and visit > %s' | extra_condition = 'and ov.visit > %s' | ||||
args = (origin_id, last_visit, limit) | args = (origin_id, last_visit, limit) | ||||
else: | else: | ||||
extra_condition = '' | extra_condition = '' | ||||
args = (origin_id, limit) | args = (origin_id, limit) | ||||
query = """\ | query = """\ | ||||
SELECT %s | SELECT DISTINCT ON (ov.visit) %s | ||||
FROM origin_visit | FROM origin_visit ov | ||||
INNER JOIN origin ON origin.id = origin_visit.origin | INNER JOIN origin o ON o.id = ov.origin | ||||
WHERE origin.url=%%s %s | INNER JOIN origin_visit_update ovu | ||||
order by visit asc | ON ov.origin = ovu.origin AND ov.visit = ovu.visit | ||||
limit %%s""" % ( | WHERE o.url=%%s %s | ||||
ORDER BY ov.visit ASC, ovu.date DESC | |||||
LIMIT %%s""" % ( | |||||
', '.join(self.origin_visit_select_cols), extra_condition | ', '.join(self.origin_visit_select_cols), extra_condition | ||||
) | ) | ||||
cur.execute(query, args) | cur.execute(query, args) | ||||
yield from cur | yield from cur | ||||
def origin_visit_get(self, origin_id, visit_id, cur=None): | def origin_visit_get(self, origin_id, visit_id, cur=None): | ||||
"""Retrieve information on visit visit_id of origin origin_id. | """Retrieve information on visit visit_id of origin origin_id. | ||||
Args: | Args: | ||||
origin_id: the origin concerned | origin_id: the origin concerned | ||||
visit_id: The visit step for that origin | visit_id: The visit step for that origin | ||||
Returns: | Returns: | ||||
The origin_visit information | The origin_visit information | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT %s | SELECT %s | ||||
FROM origin_visit | FROM origin_visit ov | ||||
INNER JOIN origin ON origin.id = origin_visit.origin | INNER JOIN origin o ON o.id = ov.origin | ||||
WHERE origin.url = %%s AND visit = %%s | INNER JOIN origin_visit_update ovu | ||||
ON ov.origin = ovu.origin AND ov.visit = ovu.visit | |||||
WHERE o.url = %%s AND ov.visit = %%s | |||||
ORDER BY ovu.date DESC | |||||
LIMIT 1 | |||||
""" % (', '.join(self.origin_visit_select_cols)) | """ % (', '.join(self.origin_visit_select_cols)) | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
r = cur.fetchall() | r = cur.fetchall() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return r[0] | return r[0] | ||||
def origin_visit_find_by_date(self, origin, visit_date, cur=None): | def origin_visit_find_by_date(self, origin, visit_date, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
'SELECT * FROM swh_visit_find_by_date(%s, %s)', | 'SELECT * FROM swh_visit_find_by_date(%s, %s)', | ||||
(origin, visit_date)) | (origin, visit_date)) | ||||
r = cur.fetchall() | rows = cur.fetchall() | ||||
if r: | if rows: | ||||
return r[0] | visit = dict(zip(self.origin_visit_get_cols, rows[0])) | ||||
visit['origin'] = origin | |||||
return visit | |||||
def origin_visit_exists(self, origin_id, visit_id, cur=None): | def origin_visit_exists(self, origin_id, visit_id, cur=None): | ||||
"""Check whether an origin visit with the given ids exists""" | """Check whether an origin visit with the given ids exists""" | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
Show All 14 Lines | def origin_visit_get_latest( | ||||
Returns: | Returns: | ||||
The origin_visit information, or None if no visit matches. | The origin_visit information, or None if no visit matches. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query_parts = [ | query_parts = [ | ||||
'SELECT %s' % ', '.join(self.origin_visit_select_cols), | 'SELECT %s' % ', '.join(self.origin_visit_select_cols), | ||||
'FROM origin_visit', | 'FROM origin_visit ov ', | ||||
'INNER JOIN origin ON origin.id = origin_visit.origin'] | 'INNER JOIN origin o ON o.id = ov.origin', | ||||
'INNER JOIN origin_visit_update ovu ', | |||||
'ON o.id = ovu.origin AND ov.visit = ovu.visit ', | |||||
] | |||||
query_parts.append('WHERE origin.url = %s') | query_parts.append('WHERE o.url = %s') | ||||
if require_snapshot: | if require_snapshot: | ||||
query_parts.append('AND snapshot is not null') | query_parts.append('AND ovu.snapshot is not null') | ||||
if allowed_statuses: | if allowed_statuses: | ||||
query_parts.append( | query_parts.append( | ||||
cur.mogrify('AND status IN %s', | cur.mogrify('AND ovu.status IN %s', | ||||
(tuple(allowed_statuses),)).decode()) | (tuple(allowed_statuses),)).decode()) | ||||
query_parts.append('ORDER BY date DESC, visit DESC LIMIT 1') | query_parts.append( | ||||
'ORDER BY ov.date DESC, ov.visit DESC, ovu.date DESC LIMIT 1') | |||||
query = '\n'.join(query_parts) | query = '\n'.join(query_parts) | ||||
cur.execute(query, (origin_id,)) | cur.execute(query, (origin_id,)) | ||||
r = cur.fetchone() | r = cur.fetchone() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return r | return r | ||||
def origin_visit_get_random(self, type, cur=None): | def origin_visit_get_random(self, type, cur=None): | ||||
"""Randomly select one origin visit that was full and in the last 3 | """Randomly select one origin visit that was full and in the last 3 | ||||
months | months | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
columns = ','.join(self.origin_visit_select_cols) | columns = ','.join(self.origin_visit_select_cols) | ||||
query = f"""with visits as ( | query = f"""select {columns} | ||||
select * | from origin_visit ov | ||||
from origin_visit | inner join origin o on ov.origin=o.id | ||||
where origin_visit.status='full' and | inner join origin_visit_update ovu | ||||
origin_visit.type=%s and | on ov.origin = ovu.origin and ov.visit = ovu.visit | ||||
origin_visit.date > now() - '3 months'::interval | where ovu.status='full' | ||||
) | and ov.type=%s | ||||
select {columns} | and ov.date > now() - '3 months'::interval | ||||
from visits as origin_visit | and random() < 0.1 | ||||
inner join origin | |||||
on origin_visit.origin=origin.id | |||||
where random() < 0.1 | |||||
limit 1 | limit 1 | ||||
""" | """ | ||||
cur.execute(query, (type, )) | cur.execute(query, (type, )) | ||||
return cur.fetchone() | return cur.fetchone() | ||||
@staticmethod | @staticmethod | ||||
def mangle_query_key(key, main_table): | def mangle_query_key(key, main_table): | ||||
if key == 'id': | if key == 'id': | ||||
▲ Show 20 Lines • Show All 193 Lines • ▼ Show 20 Lines | def _origin_query(self, url_pattern, count=False, offset=0, limit=50, | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if count: | if count: | ||||
origin_cols = 'COUNT(*)' | origin_cols = 'COUNT(*)' | ||||
else: | else: | ||||
origin_cols = ','.join(self.origin_cols) | origin_cols = ','.join(self.origin_cols) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM origin | FROM origin o | ||||
WHERE """ | WHERE """ | ||||
if with_visit: | if with_visit: | ||||
query += """ | query += """ | ||||
EXISTS ( | EXISTS ( | ||||
SELECT 1 | SELECT 1 | ||||
FROM origin_visit | FROM origin_visit ov | ||||
INNER JOIN snapshot ON snapshot=snapshot.id | INNER JOIN origin_visit_update ovu | ||||
WHERE origin=origin.id | ON ov.origin = ovu.origin AND ov.visit = ovu.visit | ||||
INNER JOIN snapshot ON ovu.snapshot=snapshot.id | |||||
WHERE ov.origin=o.id | |||||
) | ) | ||||
AND """ | AND """ | ||||
query += 'url %s %%s ' | query += 'url %s %%s ' | ||||
if not count: | if not count: | ||||
query += 'ORDER BY id OFFSET %%s LIMIT %%s' | query += 'ORDER BY id OFFSET %%s LIMIT %%s' | ||||
if not regexp: | if not regexp: | ||||
query = query % (origin_cols, 'ILIKE') | query = query % (origin_cols, 'ILIKE') | ||||
▲ Show 20 Lines • Show All 200 Lines • Show Last 20 Lines |