diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/147.sql @@ -0,0 +1,51 @@ +-- SWH DB schema upgrade +-- from_version: 146 +-- to_version: 147 +-- description: Add origin_visit_update table and migrate origin_visit +-- to origin_visit_update + +-- latest schema version +insert into dbversion(version, release, description) + values(147, now(), 'Work In Progress'); + +-- schema change + +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit updates +create table origin_visit_update +( + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_status not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_update.origin is 'origin concerned by the visit update'; +comment on column origin_visit_update.visit is 'visit concerned by the visit update'; +comment on column origin_visit_update.date is 'Visit update timestamp'; +comment on column origin_visit_update.status is 'Visit update status'; +comment on column origin_visit_update.metadata is 'Origin metadata at visit update time'; +comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time'; + +-- origin_visit_update + +create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date); +alter table origin_visit_update add primary key using index origin_visit_update_pkey; + +alter table origin_visit_update + add constraint origin_visit_update_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey; + + +-- data change + +-- best approximation of the visit update date is the origin_visit's date +insert into origin_visit_update (origin, visit, date, status, metadata, snapshot) +select origin, visit, date, status, metadata, snaspshot +from origin_visit +on conflict origin_visit_update(origin, visit, date) +do nothing; diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -6,10 +6,12 @@ import random import select +from typing import Any, Dict, Optional, Tuple + from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator -from swh.model.model import OriginVisit, SHA1_SIZE +from swh.model.model import OriginVisit, OriginVisitUpdate, SHA1_SIZE class Db(BaseDb): @@ -258,9 +260,13 @@ def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): cur = self._cursor(cur) query = """\ - SELECT snapshot FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url=%s AND origin_visit.visit=%s; + SELECT ovu.snapshot + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + WHERE o.url=%s AND ov.visit=%s + ORDER BY ovu.date DESC LIMIT 1 """ cur.execute(query, (origin_url, visit_id)) @@ -440,30 +446,33 @@ ) return cur.fetchone()[0] - def origin_visit_update(self, origin_id, visit_id, updates, cur=None): - """Update origin_visit's status.""" + origin_visit_update_cols = [ + "origin", + "visit", + "date", + "status", + "snapshot", + "metadata", + ] + + def origin_visit_update_add( + self, visit_update: OriginVisitUpdate, cur=None + ) -> None: + assert self.origin_visit_update_cols[0] == "origin" + assert self.origin_visit_update_cols[-1] == "metadata" + cols = self.origin_visit_update_cols[1:-1] cur = self._cursor(cur) - update_cols = [] - values = [] - where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"] - where_values = [origin_id, visit_id] - if "status" in updates: - update_cols.append("status=%s") - values.append(updates.pop("status")) - if "metadata" in updates: - update_cols.append("metadata=%s") - values.append(jsonize(updates.pop("metadata"))) - if "snapshot" in updates: - update_cols.append("snapshot=%s") - values.append(updates.pop("snapshot")) - assert not updates, "Unknown fields: %r" % updates - query = """UPDATE origin_visit - SET {update_cols} - FROM origin - WHERE {where}""".format( - **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)} + cur.execute( + f"WITH origin_id as (select id from origin where url=%s) " + f"INSERT INTO origin_visit_update " + f"(origin, {', '.join(cols)}, metadata) " + f"VALUES ((select id from origin_id), " + f"{', '.join(['%s']*len(cols))}, %s) " + f"ON CONFLICT (origin, visit, date) do nothing", + [visit_update.origin] + + [getattr(visit_update, key) for key in cols] + + [jsonize(visit_update.metadata)], ) - cur.execute(query, (*values, *where_values)) def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: # doing an extra query like this is way simpler than trying to join @@ -504,15 +513,42 @@ "snapshot", ] origin_visit_select_cols = [ - "origin.url AS origin", - "visit", - "date", - "origin_visit.type AS type", - "status", - "metadata", - "snapshot", + "o.url AS origin", + "ov.visit", + "ov.date", + "ov.type AS type", + "ovu.status", + "ovu.metadata", + "ovu.snapshot", ] + def _make_origin_visit_update(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: + """Make an origin_visit_update dict out of a row + + """ + if not row: + return None + return dict(zip(self.origin_visit_update_cols, row)) + + def origin_visit_update_get_latest( + self, origin: str, visit: int, cur=None + ) -> Optional[Dict[str, Any]]: + """Given an origin visit id, return its latest origin_visit_update + + """ + cols = self.origin_visit_update_cols + cur = self._cursor(cur) + cur.execute( + f"SELECT {', '.join(cols)} " + f"FROM origin_visit_update ovu " + f"INNER JOIN origin o on o.id=ovu.origin " + f"WHERE o.url=%s AND ovu.visit=%s" + f"ORDER BY ovu.date DESC LIMIT 1", + (origin, visit), + ) + row = cur.fetchone() + return self._make_origin_visit_update(row) + def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): """Retrieve all visits for origin with id origin_id. @@ -520,25 +556,27 @@ origin_id: The occurrence's origin Yields: - The occurrence's history visits + The visits for that origin """ cur = self._cursor(cur) if last_visit: - extra_condition = "and visit > %s" + extra_condition = "and ov.visit > %s" args = (origin_id, last_visit, limit) else: extra_condition = "" args = (origin_id, limit) query = """\ - SELECT %s - FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url=%%s %s - order by visit asc - limit %%s""" % ( + SELECT DISTINCT ON (ov.visit) %s + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + WHERE o.url=%%s %s + ORDER BY ov.visit ASC, ovu.date DESC + LIMIT %%s""" % ( ", ".join(self.origin_visit_select_cols), extra_condition, ) @@ -562,9 +600,13 @@ query = """\ SELECT %s - FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url = %%s AND visit = %%s + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + WHERE o.url = %%s AND ov.visit = %%s + ORDER BY ovu.date DESC + LIMIT 1 """ % ( ", ".join(self.origin_visit_select_cols) ) @@ -580,9 +622,11 @@ cur.execute( "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) ) - r = cur.fetchall() - if r: - return r[0] + rows = cur.fetchall() + if rows: + visit = dict(zip(self.origin_visit_get_cols, rows[0])) + visit["origin"] = origin + return visit def origin_visit_exists(self, origin_id, visit_id, cur=None): """Check whether an origin visit with the given ids exists""" @@ -613,21 +657,25 @@ query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_select_cols), - "FROM origin_visit", - "INNER JOIN origin ON origin.id = origin_visit.origin", + "FROM origin_visit ov ", + "INNER JOIN origin o ON o.id = ov.origin", + "INNER JOIN origin_visit_update ovu ", + "ON o.id = ovu.origin AND ov.visit = ovu.visit ", ] - query_parts.append("WHERE origin.url = %s") + query_parts.append("WHERE o.url = %s") if require_snapshot: - query_parts.append("AND snapshot is not null") + query_parts.append("AND ovu.snapshot is not null") if allowed_statuses: query_parts.append( - cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode() + cur.mogrify("AND ovu.status IN %s", (tuple(allowed_statuses),)).decode() ) - query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1") + query_parts.append( + "ORDER BY ov.date DESC, ov.visit DESC, ovu.date DESC LIMIT 1" + ) query = "\n".join(query_parts) @@ -644,18 +692,15 @@ """ cur = self._cursor(cur) columns = ",".join(self.origin_visit_select_cols) - query = f"""with visits as ( - select * - from origin_visit - where origin_visit.status='full' and - origin_visit.type=%s and - origin_visit.date > now() - '3 months'::interval - ) - select {columns} - from visits as origin_visit - inner join origin - on origin_visit.origin=origin.id - where random() < 0.1 + query = f"""select {columns} + from origin_visit ov + inner join origin o on ov.origin=o.id + inner join origin_visit_update ovu + on ov.origin = ovu.origin and ov.visit = ovu.visit + where ovu.status='full' + and ov.type=%s + and ov.date > now() - '3 months'::interval + and random() < 0.1 limit 1 """ cur.execute(query, (type,)) @@ -889,15 +934,17 @@ origin_cols = ",".join(self.origin_cols) query = """SELECT %s - FROM origin + FROM origin o WHERE """ if with_visit: query += """ EXISTS ( SELECT 1 - FROM origin_visit - INNER JOIN snapshot ON snapshot=snapshot.id - WHERE origin=origin.id + FROM origin_visit ov + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + INNER JOIN snapshot ON ovu.snapshot=snapshot.id + WHERE ov.origin=o.id ) AND """ query += "url %s %%s " diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(146, now(), 'Work In Progress'); + values(147, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -282,6 +282,7 @@ visit bigint not null, date timestamptz not null, type text not null, + -- remove those when done migrating the schema status origin_visit_status not null, metadata jsonb, snapshot sha1_git @@ -296,6 +297,26 @@ comment on column origin_visit.snapshot is 'Origin snapshot at visit time'; +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit updates +create table origin_visit_update +( + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_status not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_update.origin is 'origin concerned by the visit update'; +comment on column origin_visit_update.visit is 'visit concerned by the visit update'; +comment on column origin_visit_update.date is 'Visit update timestamp'; +comment on column origin_visit_update.status is 'Visit update status'; +comment on column origin_visit_update.metadata is 'Origin metadata at visit update time'; +comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time'; + + -- A snapshot represents the entire state of a software origin as crawled by -- Software Heritage. This table is a simple mapping between (public) intrinsic -- snapshot identifiers and (private) numeric sequential identifiers. diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql --- a/swh/storage/sql/60-swh-indexes.sql +++ b/swh/storage/sql/60-swh-indexes.sql @@ -130,6 +130,17 @@ alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid; alter table origin_visit validate constraint origin_visit_origin_fkey; +-- origin_visit_update + +create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date); +alter table origin_visit_update add primary key using index origin_visit_update_pkey; + +alter table origin_visit_update + add constraint origin_visit_update_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey; + -- release create unique index concurrently release_pkey on release(id); alter table release add primary key using index release_pkey; diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -23,6 +23,7 @@ Directory, Origin, OriginVisit, + OriginVisitUpdate, Revision, Release, SkippedContent, @@ -842,6 +843,7 @@ with convert_validation_exceptions(): visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) + status = "ongoing" # We can write to the journal only after inserting to the # DB, because we want the id of the visit visit = OriginVisit.from_dict( @@ -850,16 +852,42 @@ "date": date, "type": type, "visit": visit_id, - "status": "ongoing", + # TODO: Remove when we remove those fields from the model + "status": status, "metadata": None, "snapshot": None, } ) + + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date, + status=status, + snapshot=None, + metadata=None, + ) + self._origin_visit_update_add(visit_update, db=db, cur=cur) + self.journal_writer.origin_visit_add(visit) send_metric("origin_visit:add", count=1, method_name="origin_visit") return visit + def _origin_visit_update_add( + self, origin_visit_update: OriginVisitUpdate, db, cur + ) -> None: + """Add an origin visit update""" + # Inject origin visit update in the schema + db.origin_visit_update_add(origin_visit_update, cur=cur) + + # write to the journal the origin visit update + + send_metric( + "origin_visit_update:add", count=1, method_name="origin_visit_update" + ) + @timed @db_transaction() def origin_visit_update( @@ -898,8 +926,66 @@ updated_visit = OriginVisit.from_dict({**visit, **updates}) self.journal_writer.origin_visit_update(updated_visit) + last_visit_update = self._origin_visit_get_updated( + origin, visit_id, db=db, cur=cur + ) + assert last_visit_update is not None + with convert_validation_exceptions(): - db.origin_visit_update(origin_url, visit_id, updates, cur) + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date or now(), + status=status, + snapshot=snapshot or last_visit_update["snapshot"], + metadata=metadata or last_visit_update["metadata"], + ) + self._origin_visit_update_add(visit_update, db=db, cur=cur) + + def _origin_visit_get_updated( + self, origin: str, visit_id: int, db, cur + ) -> Optional[Dict[str, Any]]: + """Retrieve origin visit and latest origin visit update and merge them + into an origin visit. + + """ + row_visit = db.origin_visit_get(origin, visit_id) + if row_visit is None: + return None + visit = dict(zip(db.origin_visit_get_cols, row_visit)) + return self._origin_visit_apply_update(visit, db=db, cur=cur) + + def _origin_visit_apply_update( + self, visit: Dict[str, Any], db, cur=None + ) -> Dict[str, Any]: + """Retrieve the latest visit update information for the origin visit. + Then merge it with the visit and return it. + + """ + visit_update = db.origin_visit_update_get_latest( + visit["origin"], visit["visit"] + ) + return self._origin_visit_merge(visit, visit_update) + + def _origin_visit_merge( + self, visit: Dict[str, Any], visit_update: Dict[str, Any] + ) -> Dict[str, Any]: + """Merge origin_visit and origin_visit_update together. + + """ + return OriginVisit.from_dict( + { + # default to the values in visit + **visit, + # override with the last update + **visit_update, + # visit['origin'] is the URL (via a join), while + # visit_update['origin'] is only an id. + "origin": visit["origin"], + # but keep the date of the creation of the origin visit + "date": visit["date"], + } + ).to_dict() @timed @db_transaction() @@ -914,7 +1000,18 @@ for visit in visits: # TODO: upsert them all in a single query + assert visit.visit is not None db.origin_visit_upsert(visit, cur=cur) + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=visit.origin, + visit=visit.visit, + date=now(), + status=visit.status, + snapshot=visit.snapshot, + metadata=visit.metadata, + ) + db.origin_visit_update_add(visit_update, cur=cur) @timed @db_transaction_generator(statement_timeout=500) @@ -926,20 +1023,21 @@ db=None, cur=None, ) -> Iterable[Dict[str, Any]]: - for line in db.origin_visit_get_all( + lines = db.origin_visit_get_all( origin, last_visit=last_visit, limit=limit, cur=cur - ): - data = dict(zip(db.origin_visit_get_cols, line)) - yield data + ) + for line in lines: + visit = dict(zip(db.origin_visit_get_cols, line)) + yield self._origin_visit_apply_update(visit, db) @timed @db_transaction(statement_timeout=500) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime, db=None, cur=None ) -> Optional[Dict[str, Any]]: - line = db.origin_visit_find_by_date(origin, visit_date, cur=cur) - if line: - return dict(zip(db.origin_visit_get_cols, line)) + visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) + if visit: + return self._origin_visit_apply_update(visit, db) return None @timed @@ -947,11 +1045,11 @@ def origin_visit_get_by( self, origin: str, visit: int, db=None, cur=None ) -> Optional[Dict[str, Any]]: - ori_visit = db.origin_visit_get(origin, visit, cur) - if not ori_visit: - return None - - return dict(zip(db.origin_visit_get_cols, ori_visit)) + row = db.origin_visit_get(origin, visit, cur) + if row: + visit_dict = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit_dict, db) + return None @timed @db_transaction(statement_timeout=4000) @@ -963,14 +1061,15 @@ db=None, cur=None, ) -> Optional[Dict[str, Any]]: - origin_visit = db.origin_visit_get_latest( + row = db.origin_visit_get_latest( origin, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, cur=cur, ) - if origin_visit: - return dict(zip(db.origin_visit_get_cols, origin_visit)) + if row: + visit = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit, db) return None @timed @@ -978,11 +1077,11 @@ def origin_visit_get_random( self, type: str, db=None, cur=None ) -> Optional[Dict[str, Any]]: - result = db.origin_visit_get_random(type, cur) - if result: - return dict(zip(db.origin_visit_get_cols, result)) - else: - return None + row = db.origin_visit_get_random(type, cur) + if row: + visit = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit, db) + return None @timed @db_transaction(statement_timeout=2000)