Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
Show First 20 Lines • Show All 254 Lines • ▼ Show 20 Lines | ): | ||||
cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) | cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) | ||||
yield from cur | yield from cur | ||||
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT ovs.snapshot | SELECT snapshot FROM origin_visit | ||||
FROM origin_visit ov | INNER JOIN origin ON origin.id = origin_visit.origin | ||||
INNER JOIN origin o ON o.id = ov.origin | WHERE origin.url=%s AND origin_visit.visit=%s; | ||||
INNER JOIN origin_visit_status ovs | |||||
ON ov.origin = ovs.origin AND ov.visit = ovs.visit | |||||
WHERE o.url=%s AND ov.visit=%s | |||||
ORDER BY ovs.date DESC LIMIT 1 | |||||
""" | """ | ||||
cur.execute(query, (origin_url, visit_id)) | cur.execute(query, (origin_url, visit_id)) | ||||
ret = cur.fetchone() | ret = cur.fetchone() | ||||
if ret: | if ret: | ||||
return ret[0] | return ret[0] | ||||
def snapshot_get_random(self, cur=None): | def snapshot_get_random(self, cur=None): | ||||
▲ Show 20 Lines • Show All 175 Lines • ▼ Show 20 Lines | origin_visit_status_cols = [ | ||||
"status", | "status", | ||||
"snapshot", | "snapshot", | ||||
"metadata", | "metadata", | ||||
] | ] | ||||
def origin_visit_status_add( | def origin_visit_status_add( | ||||
self, visit_status: OriginVisitStatus, cur=None | self, visit_status: OriginVisitStatus, cur=None | ||||
) -> None: | ) -> None: | ||||
"""Add new origin visit status | |||||
""" | |||||
assert self.origin_visit_status_cols[0] == "origin" | assert self.origin_visit_status_cols[0] == "origin" | ||||
assert self.origin_visit_status_cols[-1] == "metadata" | assert self.origin_visit_status_cols[-1] == "metadata" | ||||
cols = self.origin_visit_status_cols[1:-1] | cols = self.origin_visit_status_cols[1:-1] | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
f"WITH origin_id as (select id from origin where url=%s) " | f"WITH origin_id as (select id from origin where url=%s) " | ||||
f"INSERT INTO origin_visit_status " | f"INSERT INTO origin_visit_status " | ||||
f"(origin, {', '.join(cols)}, metadata) " | f"(origin, {', '.join(cols)}, metadata) " | ||||
f"VALUES ((select id from origin_id), " | f"VALUES ((select id from origin_id), " | ||||
f"{', '.join(['%s']*len(cols))}, %s) " | f"{', '.join(['%s']*len(cols))}, %s) " | ||||
f"ON CONFLICT (origin, visit, date) do nothing", | f"ON CONFLICT (origin, visit, date) do nothing", | ||||
[visit_status.origin] | [visit_status.origin] | ||||
+ [getattr(visit_status, key) for key in cols] | + [getattr(visit_status, key) for key in cols] | ||||
+ [jsonize(visit_status.metadata)], | + [jsonize(visit_status.metadata)], | ||||
) | ) | ||||
def origin_visit_update(self, origin_id, visit_id, updates, cur=None): | |||||
"""Update origin_visit's status.""" | |||||
cur = self._cursor(cur) | |||||
update_cols = [] | |||||
values = [] | |||||
where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"] | |||||
where_values = [origin_id, visit_id] | |||||
if "status" in updates: | |||||
update_cols.append("status=%s") | |||||
values.append(updates.pop("status")) | |||||
if "metadata" in updates: | |||||
update_cols.append("metadata=%s") | |||||
values.append(jsonize(updates.pop("metadata"))) | |||||
if "snapshot" in updates: | |||||
update_cols.append("snapshot=%s") | |||||
values.append(updates.pop("snapshot")) | |||||
assert not updates, "Unknown fields: %r" % updates | |||||
query = """UPDATE origin_visit | |||||
SET {update_cols} | |||||
FROM origin | |||||
WHERE {where}""".format( | |||||
**{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)} | |||||
) | |||||
cur.execute(query, (*values, *where_values)) | |||||
def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: | ||||
# doing an extra query like this is way simpler than trying to join | # doing an extra query like this is way simpler than trying to join | ||||
# the origin id in the query below | # the origin id in the query below | ||||
ov = origin_visit | ov = origin_visit | ||||
origin_id = next(self.origin_id_get_by_url([ov.origin])) | origin_id = next(self.origin_id_get_by_url([ov.origin])) | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """INSERT INTO origin_visit ({cols}) VALUES ({values}) | query = """INSERT INTO origin_visit ({cols}) VALUES ({values}) | ||||
Show All 23 Lines | origin_visit_get_cols = [ | ||||
"visit", | "visit", | ||||
"date", | "date", | ||||
"type", | "type", | ||||
"status", | "status", | ||||
"metadata", | "metadata", | ||||
"snapshot", | "snapshot", | ||||
] | ] | ||||
origin_visit_select_cols = [ | origin_visit_select_cols = [ | ||||
"o.url AS origin", | "origin.url AS origin", | ||||
"ov.visit", | "visit", | ||||
"ov.date", | "date", | ||||
"ov.type AS type", | "origin_visit.type AS type", | ||||
"ovs.status", | "status", | ||||
"ovs.metadata", | "metadata", | ||||
"ovs.snapshot", | "snapshot", | ||||
] | ] | ||||
def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: | def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: | ||||
"""Make an origin_visit_status dict out of a row | """Make an origin_visit_status dict out of a row | ||||
""" | """ | ||||
if not row: | if not row: | ||||
return None | return None | ||||
Show All 26 Lines | def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): | ||||
Yields: | Yields: | ||||
The visits for that origin | The visits for that origin | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if last_visit: | if last_visit: | ||||
extra_condition = "and ov.visit > %s" | extra_condition = "and visit > %s" | ||||
args = (origin_id, last_visit, limit) | args = (origin_id, last_visit, limit) | ||||
else: | else: | ||||
extra_condition = "" | extra_condition = "" | ||||
args = (origin_id, limit) | args = (origin_id, limit) | ||||
query = """\ | query = """\ | ||||
SELECT DISTINCT ON (ov.visit) %s | SELECT %s | ||||
FROM origin_visit ov | FROM origin_visit | ||||
INNER JOIN origin o ON o.id = ov.origin | INNER JOIN origin ON origin.id = origin_visit.origin | ||||
INNER JOIN origin_visit_status ovs | WHERE origin.url=%%s %s | ||||
ON ov.origin = ovs.origin AND ov.visit = ovs.visit | order by visit asc | ||||
WHERE o.url=%%s %s | limit %%s""" % ( | ||||
ORDER BY ov.visit ASC, ovs.date DESC | |||||
LIMIT %%s""" % ( | |||||
", ".join(self.origin_visit_select_cols), | ", ".join(self.origin_visit_select_cols), | ||||
extra_condition, | extra_condition, | ||||
) | ) | ||||
cur.execute(query, args) | cur.execute(query, args) | ||||
yield from cur | yield from cur | ||||
def origin_visit_get(self, origin_id, visit_id, cur=None): | def origin_visit_get(self, origin_id, visit_id, cur=None): | ||||
"""Retrieve information on visit visit_id of origin origin_id. | """Retrieve information on visit visit_id of origin origin_id. | ||||
Args: | Args: | ||||
origin_id: the origin concerned | origin_id: the origin concerned | ||||
visit_id: The visit step for that origin | visit_id: The visit step for that origin | ||||
Returns: | Returns: | ||||
The origin_visit information | The origin_visit information | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT %s | SELECT %s | ||||
FROM origin_visit ov | FROM origin_visit | ||||
INNER JOIN origin o ON o.id = ov.origin | INNER JOIN origin ON origin.id = origin_visit.origin | ||||
INNER JOIN origin_visit_status ovs | WHERE origin.url = %%s AND visit = %%s | ||||
ON ov.origin = ovs.origin AND ov.visit = ovs.visit | |||||
WHERE o.url = %%s AND ov.visit = %%s | |||||
ORDER BY ovs.date DESC | |||||
LIMIT 1 | |||||
""" % ( | """ % ( | ||||
", ".join(self.origin_visit_select_cols) | ", ".join(self.origin_visit_select_cols) | ||||
) | ) | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
r = cur.fetchall() | r = cur.fetchall() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
Show All 34 Lines | ): | ||||
Returns: | Returns: | ||||
The origin_visit information, or None if no visit matches. | The origin_visit information, or None if no visit matches. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query_parts = [ | query_parts = [ | ||||
"SELECT %s" % ", ".join(self.origin_visit_select_cols), | "SELECT %s" % ", ".join(self.origin_visit_select_cols), | ||||
"FROM origin_visit ov ", | "FROM origin_visit", | ||||
"INNER JOIN origin o ON o.id = ov.origin", | "INNER JOIN origin ON origin.id = origin_visit.origin", | ||||
"INNER JOIN origin_visit_status ovs ", | |||||
"ON o.id = ovs.origin AND ov.visit = ovs.visit ", | |||||
] | ] | ||||
query_parts.append("WHERE o.url = %s") | query_parts.append("WHERE origin.url = %s") | ||||
if require_snapshot: | if require_snapshot: | ||||
query_parts.append("AND ovs.snapshot is not null") | query_parts.append("AND snapshot is not null") | ||||
if allowed_statuses: | if allowed_statuses: | ||||
query_parts.append( | query_parts.append( | ||||
cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode() | cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode() | ||||
) | ) | ||||
query_parts.append( | query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1") | ||||
"ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" | |||||
) | |||||
query = "\n".join(query_parts) | query = "\n".join(query_parts) | ||||
cur.execute(query, (origin_id,)) | cur.execute(query, (origin_id,)) | ||||
r = cur.fetchone() | r = cur.fetchone() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return r | return r | ||||
def origin_visit_get_random(self, type, cur=None): | def origin_visit_get_random(self, type, cur=None): | ||||
"""Randomly select one origin visit that was full and in the last 3 | """Randomly select one origin visit that was full and in the last 3 | ||||
months | months | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
columns = ",".join(self.origin_visit_select_cols) | columns = ",".join(self.origin_visit_select_cols) | ||||
query = f"""select {columns} | query = f"""with visits as ( | ||||
from origin_visit ov | select * | ||||
inner join origin o on ov.origin=o.id | from origin_visit | ||||
inner join origin_visit_status ovs | where origin_visit.status='full' and | ||||
on ov.origin = ovs.origin and ov.visit = ovs.visit | origin_visit.type=%s and | ||||
where ovs.status='full' | origin_visit.date > now() - '3 months'::interval | ||||
and ov.type=%s | ) | ||||
and ov.date > now() - '3 months'::interval | select {columns} | ||||
and random() < 0.1 | from visits as origin_visit | ||||
inner join origin | |||||
on origin_visit.origin=origin.id | |||||
where random() < 0.1 | |||||
limit 1 | limit 1 | ||||
""" | """ | ||||
cur.execute(query, (type,)) | cur.execute(query, (type,)) | ||||
return cur.fetchone() | return cur.fetchone() | ||||
@staticmethod | @staticmethod | ||||
def mangle_query_key(key, main_table): | def mangle_query_key(key, main_table): | ||||
if key == "id": | if key == "id": | ||||
▲ Show 20 Lines • Show All 217 Lines • ▼ Show 20 Lines | ): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if count: | if count: | ||||
origin_cols = "COUNT(*)" | origin_cols = "COUNT(*)" | ||||
else: | else: | ||||
origin_cols = ",".join(self.origin_cols) | origin_cols = ",".join(self.origin_cols) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM origin o | FROM origin | ||||
WHERE """ | WHERE """ | ||||
if with_visit: | if with_visit: | ||||
query += """ | query += """ | ||||
EXISTS ( | EXISTS ( | ||||
SELECT 1 | SELECT 1 | ||||
FROM origin_visit ov | FROM origin_visit | ||||
INNER JOIN origin_visit_status ovs | INNER JOIN snapshot ON snapshot=snapshot.id | ||||
ON ov.origin = ovs.origin AND ov.visit = ovs.visit | WHERE origin=origin.id | ||||
INNER JOIN snapshot ON ovs.snapshot=snapshot.id | |||||
WHERE ov.origin=o.id | |||||
) | ) | ||||
AND """ | AND """ | ||||
query += "url %s %%s " | query += "url %s %%s " | ||||
if not count: | if not count: | ||||
query += "ORDER BY id OFFSET %%s LIMIT %%s" | query += "ORDER BY id OFFSET %%s LIMIT %%s" | ||||
if not regexp: | if not regexp: | ||||
query = query % (origin_cols, "ILIKE") | query = query % (origin_cols, "ILIKE") | ||||
▲ Show 20 Lines • Show All 237 Lines • Show Last 20 Lines |