diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -260,13 +260,9 @@ def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): cur = self._cursor(cur) query = """\ - SELECT ovs.snapshot - FROM origin_visit ov - INNER JOIN origin o ON o.id = ov.origin - INNER JOIN origin_visit_status ovs - ON ov.origin = ovs.origin AND ov.visit = ovs.visit - WHERE o.url=%s AND ov.visit=%s - ORDER BY ovs.date DESC LIMIT 1 + SELECT snapshot FROM origin_visit + INNER JOIN origin ON origin.id = origin_visit.origin + WHERE origin.url=%s AND origin_visit.visit=%s; """ cur.execute(query, (origin_url, visit_id)) @@ -458,6 +454,9 @@ def origin_visit_status_add( self, visit_status: OriginVisitStatus, cur=None ) -> None: + """Add new origin visit status + + """ assert self.origin_visit_status_cols[0] == "origin" assert self.origin_visit_status_cols[-1] == "metadata" cols = self.origin_visit_status_cols[1:-1] @@ -474,6 +473,31 @@ + [jsonize(visit_status.metadata)], ) + def origin_visit_update(self, origin_id, visit_id, updates, cur=None): + """Update origin_visit's status.""" + cur = self._cursor(cur) + update_cols = [] + values = [] + where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"] + where_values = [origin_id, visit_id] + if "status" in updates: + update_cols.append("status=%s") + values.append(updates.pop("status")) + if "metadata" in updates: + update_cols.append("metadata=%s") + values.append(jsonize(updates.pop("metadata"))) + if "snapshot" in updates: + update_cols.append("snapshot=%s") + values.append(updates.pop("snapshot")) + assert not updates, "Unknown fields: %r" % updates + query = """UPDATE origin_visit + SET {update_cols} + FROM origin + WHERE {where}""".format( + **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)} + ) + cur.execute(query, (*values, *where_values)) + def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: # doing an extra query like this is way simpler than trying to join # the origin id in the query below @@ -513,13 +537,13 @@ "snapshot", ] origin_visit_select_cols = [ - "o.url AS origin", - "ov.visit", - "ov.date", - "ov.type AS type", - "ovs.status", - "ovs.metadata", - "ovs.snapshot", + "origin.url AS origin", + "visit", + "date", + "origin_visit.type AS type", + "status", + "metadata", + "snapshot", ] def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: @@ -562,21 +586,19 @@ cur = self._cursor(cur) if last_visit: - extra_condition = "and ov.visit > %s" + extra_condition = "and visit > %s" args = (origin_id, last_visit, limit) else: extra_condition = "" args = (origin_id, limit) query = """\ - SELECT DISTINCT ON (ov.visit) %s - FROM origin_visit ov - INNER JOIN origin o ON o.id = ov.origin - INNER JOIN origin_visit_status ovs - ON ov.origin = ovs.origin AND ov.visit = ovs.visit - WHERE o.url=%%s %s - ORDER BY ov.visit ASC, ovs.date DESC - LIMIT %%s""" % ( + SELECT %s + FROM origin_visit + INNER JOIN origin ON origin.id = origin_visit.origin + WHERE origin.url=%%s %s + order by visit asc + limit %%s""" % ( ", ".join(self.origin_visit_select_cols), extra_condition, ) @@ -600,13 +622,9 @@ query = """\ SELECT %s - FROM origin_visit ov - INNER JOIN origin o ON o.id = ov.origin - INNER JOIN origin_visit_status ovs - ON ov.origin = ovs.origin AND ov.visit = ovs.visit - WHERE o.url = %%s AND ov.visit = %%s - ORDER BY ovs.date DESC - LIMIT 1 + FROM origin_visit + INNER JOIN origin ON origin.id = origin_visit.origin + WHERE origin.url = %%s AND visit = %%s """ % ( ", ".join(self.origin_visit_select_cols) ) @@ -657,25 +675,21 @@ query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_select_cols), - "FROM origin_visit ov ", - "INNER JOIN origin o ON o.id = ov.origin", - "INNER JOIN origin_visit_status ovs ", - "ON o.id = ovs.origin AND ov.visit = ovs.visit ", + "FROM origin_visit", + "INNER JOIN origin ON origin.id = origin_visit.origin", ] - query_parts.append("WHERE o.url = %s") + query_parts.append("WHERE origin.url = %s") if require_snapshot: - query_parts.append("AND ovs.snapshot is not null") + query_parts.append("AND snapshot is not null") if allowed_statuses: query_parts.append( - cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode() + cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode() ) - query_parts.append( - "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" - ) + query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1") query = "\n".join(query_parts) @@ -692,15 +706,18 @@ """ cur = self._cursor(cur) columns = ",".join(self.origin_visit_select_cols) - query = f"""select {columns} - from origin_visit ov - inner join origin o on ov.origin=o.id - inner join origin_visit_status ovs - on ov.origin = ovs.origin and ov.visit = ovs.visit - where ovs.status='full' - and ov.type=%s - and ov.date > now() - '3 months'::interval - and random() < 0.1 + query = f"""with visits as ( + select * + from origin_visit + where origin_visit.status='full' and + origin_visit.type=%s and + origin_visit.date > now() - '3 months'::interval + ) + select {columns} + from visits as origin_visit + inner join origin + on origin_visit.origin=origin.id + where random() < 0.1 limit 1 """ cur.execute(query, (type,)) @@ -934,17 +951,15 @@ origin_cols = ",".join(self.origin_cols) query = """SELECT %s - FROM origin o + FROM origin WHERE """ if with_visit: query += """ EXISTS ( SELECT 1 - FROM origin_visit ov - INNER JOIN origin_visit_status ovs - ON ov.origin = ovs.origin AND ov.visit = ovs.visit - INNER JOIN snapshot ON ovs.snapshot=snapshot.id - WHERE ov.origin=o.id + FROM origin_visit + INNER JOIN snapshot ON snapshot=snapshot.id + WHERE origin=origin.id ) AND """ query += "url %s %%s " diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -924,6 +924,10 @@ updated_visit = OriginVisit.from_dict({**visit, **updates}) self.journal_writer.origin_visit_update(updated_visit) + # Write updates to origin visit (backward compatibility) + db.origin_visit_update(origin, visit_id, updates) + + # Add new origin visit status last_visit_status = self._origin_visit_get_updated( origin, visit_id, db=db, cur=cur ) @@ -1021,12 +1025,11 @@ db=None, cur=None, ) -> Iterable[Dict[str, Any]]: - lines = db.origin_visit_get_all( + for line in db.origin_visit_get_all( origin, last_visit=last_visit, limit=limit, cur=cur - ) - for line in lines: - visit = dict(zip(db.origin_visit_get_cols, line)) - yield self._origin_visit_apply_update(visit, db) + ): + data = dict(zip(db.origin_visit_get_cols, line)) + yield data @timed @db_transaction(statement_timeout=500) @@ -1035,7 +1038,7 @@ ) -> Optional[Dict[str, Any]]: visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) if visit: - return self._origin_visit_apply_update(visit, db) + return visit return None @timed @@ -1043,11 +1046,11 @@ def origin_visit_get_by( self, origin: str, visit: int, db=None, cur=None ) -> Optional[Dict[str, Any]]: - row = db.origin_visit_get(origin, visit, cur) - if row: - visit_dict = dict(zip(db.origin_visit_get_cols, row)) - return self._origin_visit_apply_update(visit_dict, db) - return None + ori_visit = db.origin_visit_get(origin, visit, cur) + if not ori_visit: + return None + + return dict(zip(db.origin_visit_get_cols, ori_visit)) @timed @db_transaction(statement_timeout=4000) @@ -1059,15 +1062,14 @@ db=None, cur=None, ) -> Optional[Dict[str, Any]]: - row = db.origin_visit_get_latest( + origin_visit = db.origin_visit_get_latest( origin, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, cur=cur, ) - if row: - visit = dict(zip(db.origin_visit_get_cols, row)) - return self._origin_visit_apply_update(visit, db) + if origin_visit: + return dict(zip(db.origin_visit_get_cols, origin_visit)) return None @timed @@ -1075,11 +1077,11 @@ def origin_visit_get_random( self, type: str, db=None, cur=None ) -> Optional[Dict[str, Any]]: - row = db.origin_visit_get_random(type, cur) - if row: - visit = dict(zip(db.origin_visit_get_cols, row)) - return self._origin_visit_apply_update(visit, db) - return None + result = db.origin_visit_get_random(type, cur) + if result: + return dict(zip(db.origin_visit_get_cols, result)) + else: + return None @timed @db_transaction(statement_timeout=2000)