diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/147.sql @@ -0,0 +1,64 @@ +-- SWH DB schema upgrade +-- from_version: 146 +-- to_version: 147 +-- description: Add origin_visit_status table +-- 1. Rename enum origin_visit_status to origin_visit_state +-- 2. Add new origin_visit_status table +-- 3. Migrate origin_visit data to new origin_visit_status data + +-- latest schema version +insert into dbversion(version, release, description) + values(147, now(), 'Work In Progress'); + +-- schema change + +-- Rename old enum +alter type origin_visit_status rename to origin_visit_state; +comment on type origin_visit_state IS 'Possible visit status'; + +-- Update origin visit comment on deprecated columns +comment on column origin_visit.status is '(Deprecated) Visit status'; +comment on column origin_visit.metadata is '(Deprecated) Optional origin visit metadata'; +comment on column origin_visit.snapshot is '(Deprecated) Optional snapshot of the origin visit. It can be partial.'; + + +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit status updates +create table origin_visit_status +( + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_state not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_status.origin is 'Origin concerned by the visit update'; +comment on column origin_visit_status.visit is 'Visit concerned by the visit update'; +comment on column origin_visit_status.date is 'Visit update timestamp'; +comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)'; +comment on column origin_visit_status.metadata is 'Optional origin visit metadata'; +comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit.'; + + +-- origin_visit_status + +create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date); +alter table origin_visit_status add primary key using index origin_visit_status_pkey; + +alter table origin_visit_status + add constraint origin_visit_status_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey; + + +-- data change + +-- best approximation of the visit update date is the origin_visit's date +insert into origin_visit_status (origin, visit, date, status, metadata, snapshot) +select origin, visit, date, status, metadata, snapshot +from origin_visit +on conflict origin_visit_status(origin, visit, date) +do nothing; diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -6,10 +6,12 @@ import random import select +from typing import Any, Dict, Optional, Tuple + from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator -from swh.model.model import OriginVisit, SHA1_SIZE +from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE class Db(BaseDb): @@ -258,9 +260,13 @@ def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): cur = self._cursor(cur) query = """\ - SELECT snapshot FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url=%s AND origin_visit.visit=%s; + SELECT ovs.snapshot + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_status ovs + ON ov.origin = ovs.origin AND ov.visit = ovs.visit + WHERE o.url=%s AND ov.visit=%s + ORDER BY ovs.date DESC LIMIT 1 """ cur.execute(query, (origin_url, visit_id)) @@ -440,30 +446,33 @@ ) return cur.fetchone()[0] - def origin_visit_update(self, origin_id, visit_id, updates, cur=None): - """Update origin_visit's status.""" + origin_visit_status_cols = [ + "origin", + "visit", + "date", + "status", + "snapshot", + "metadata", + ] + + def origin_visit_status_add( + self, visit_status: OriginVisitStatus, cur=None + ) -> None: + assert self.origin_visit_status_cols[0] == "origin" + assert self.origin_visit_status_cols[-1] == "metadata" + cols = self.origin_visit_status_cols[1:-1] cur = self._cursor(cur) - update_cols = [] - values = [] - where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"] - where_values = [origin_id, visit_id] - if "status" in updates: - update_cols.append("status=%s") - values.append(updates.pop("status")) - if "metadata" in updates: - update_cols.append("metadata=%s") - values.append(jsonize(updates.pop("metadata"))) - if "snapshot" in updates: - update_cols.append("snapshot=%s") - values.append(updates.pop("snapshot")) - assert not updates, "Unknown fields: %r" % updates - query = """UPDATE origin_visit - SET {update_cols} - FROM origin - WHERE {where}""".format( - **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)} + cur.execute( + f"WITH origin_id as (select id from origin where url=%s) " + f"INSERT INTO origin_visit_status " + f"(origin, {', '.join(cols)}, metadata) " + f"VALUES ((select id from origin_id), " + f"{', '.join(['%s']*len(cols))}, %s) " + f"ON CONFLICT (origin, visit, date) do nothing", + [visit_status.origin] + + [getattr(visit_status, key) for key in cols] + + [jsonize(visit_status.metadata)], ) - cur.execute(query, (*values, *where_values)) def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: # doing an extra query like this is way simpler than trying to join @@ -504,15 +513,42 @@ "snapshot", ] origin_visit_select_cols = [ - "origin.url AS origin", - "visit", - "date", - "origin_visit.type AS type", - "status", - "metadata", - "snapshot", + "o.url AS origin", + "ov.visit", + "ov.date", + "ov.type AS type", + "ovs.status", + "ovs.metadata", + "ovs.snapshot", ] + def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: + """Make an origin_visit_status dict out of a row + + """ + if not row: + return None + return dict(zip(self.origin_visit_status_cols, row)) + + def origin_visit_status_get_latest( + self, origin: str, visit: int, cur=None + ) -> Optional[Dict[str, Any]]: + """Given an origin visit id, return its latest origin_visit_status + + """ + cols = self.origin_visit_status_cols + cur = self._cursor(cur) + cur.execute( + f"SELECT {', '.join(cols)} " + f"FROM origin_visit_status ovs " + f"INNER JOIN origin o on o.id=ovs.origin " + f"WHERE o.url=%s AND ovs.visit=%s" + f"ORDER BY ovs.date DESC LIMIT 1", + (origin, visit), + ) + row = cur.fetchone() + return self._make_origin_visit_status(row) + def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): """Retrieve all visits for origin with id origin_id. @@ -520,25 +556,27 @@ origin_id: The occurrence's origin Yields: - The occurrence's history visits + The visits for that origin """ cur = self._cursor(cur) if last_visit: - extra_condition = "and visit > %s" + extra_condition = "and ov.visit > %s" args = (origin_id, last_visit, limit) else: extra_condition = "" args = (origin_id, limit) query = """\ - SELECT %s - FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url=%%s %s - order by visit asc - limit %%s""" % ( + SELECT DISTINCT ON (ov.visit) %s + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_status ovs + ON ov.origin = ovs.origin AND ov.visit = ovs.visit + WHERE o.url=%%s %s + ORDER BY ov.visit ASC, ovs.date DESC + LIMIT %%s""" % ( ", ".join(self.origin_visit_select_cols), extra_condition, ) @@ -562,9 +600,13 @@ query = """\ SELECT %s - FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url = %%s AND visit = %%s + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_status ovs + ON ov.origin = ovs.origin AND ov.visit = ovs.visit + WHERE o.url = %%s AND ov.visit = %%s + ORDER BY ovs.date DESC + LIMIT 1 """ % ( ", ".join(self.origin_visit_select_cols) ) @@ -580,9 +622,11 @@ cur.execute( "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) ) - r = cur.fetchall() - if r: - return r[0] + rows = cur.fetchall() + if rows: + visit = dict(zip(self.origin_visit_get_cols, rows[0])) + visit["origin"] = origin + return visit def origin_visit_exists(self, origin_id, visit_id, cur=None): """Check whether an origin visit with the given ids exists""" @@ -613,21 +657,25 @@ query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_select_cols), - "FROM origin_visit", - "INNER JOIN origin ON origin.id = origin_visit.origin", + "FROM origin_visit ov ", + "INNER JOIN origin o ON o.id = ov.origin", + "INNER JOIN origin_visit_status ovs ", + "ON o.id = ovs.origin AND ov.visit = ovs.visit ", ] - query_parts.append("WHERE origin.url = %s") + query_parts.append("WHERE o.url = %s") if require_snapshot: - query_parts.append("AND snapshot is not null") + query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append( - cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode() + cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode() ) - query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1") + query_parts.append( + "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" + ) query = "\n".join(query_parts) @@ -644,18 +692,15 @@ """ cur = self._cursor(cur) columns = ",".join(self.origin_visit_select_cols) - query = f"""with visits as ( - select * - from origin_visit - where origin_visit.status='full' and - origin_visit.type=%s and - origin_visit.date > now() - '3 months'::interval - ) - select {columns} - from visits as origin_visit - inner join origin - on origin_visit.origin=origin.id - where random() < 0.1 + query = f"""select {columns} + from origin_visit ov + inner join origin o on ov.origin=o.id + inner join origin_visit_status ovs + on ov.origin = ovs.origin and ov.visit = ovs.visit + where ovs.status='full' + and ov.type=%s + and ov.date > now() - '3 months'::interval + and random() < 0.1 limit 1 """ cur.execute(query, (type,)) @@ -889,15 +934,17 @@ origin_cols = ",".join(self.origin_cols) query = """SELECT %s - FROM origin + FROM origin o WHERE """ if with_visit: query += """ EXISTS ( SELECT 1 - FROM origin_visit - INNER JOIN snapshot ON snapshot=snapshot.id - WHERE origin=origin.id + FROM origin_visit ov + INNER JOIN origin_visit_status ovs + ON ov.origin = ovs.origin AND ov.visit = ovs.visit + INNER JOIN snapshot ON ovs.snapshot=snapshot.id + WHERE ov.origin=o.id ) AND """ query += "url %s %%s " diff --git a/swh/storage/sql/20-swh-enums.sql b/swh/storage/sql/20-swh-enums.sql --- a/swh/storage/sql/20-swh-enums.sql +++ b/swh/storage/sql/20-swh-enums.sql @@ -14,9 +14,9 @@ create type snapshot_target as enum ('content', 'directory', 'revision', 'release', 'snapshot', 'alias'); comment on type snapshot_target is 'Types of targets for snapshot branches'; -create type origin_visit_status as enum ( +create type origin_visit_state as enum ( 'ongoing', 'full', 'partial' ); -comment on type origin_visit_status IS 'Possible visit status'; +comment on type origin_visit_state IS 'Possible visit status'; diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(146, now(), 'Work In Progress'); + values(147, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -282,7 +282,8 @@ visit bigint not null, date timestamptz not null, type text not null, - status origin_visit_status not null, + -- remove those when done migrating the schema + status origin_visit_state not null, metadata jsonb, snapshot sha1_git ); @@ -291,9 +292,29 @@ comment on column origin_visit.visit is 'Sequential visit number for the origin'; comment on column origin_visit.date is 'Visit timestamp'; comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)'; -comment on column origin_visit.status is 'Visit result'; -comment on column origin_visit.metadata is 'Origin metadata at visit time'; -comment on column origin_visit.snapshot is 'Origin snapshot at visit time'; +comment on column origin_visit.status is '(Deprecated) Visit status'; +comment on column origin_visit.metadata is '(Deprecated) Optional origin visit metadata'; +comment on column origin_visit.snapshot is '(Deprecated) Optional snapshot of the origin visit. It can be partial.'; + + +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit status updates +create table origin_visit_status +( + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_state not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_status.origin is 'Origin concerned by the visit update'; +comment on column origin_visit_status.visit is 'Visit concerned by the visit update'; +comment on column origin_visit_status.date is 'Visit update timestamp'; +comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)'; +comment on column origin_visit_status.metadata is 'Optional origin visit metadata'; +comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.'; -- A snapshot represents the entire state of a software origin as crawled by diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql --- a/swh/storage/sql/60-swh-indexes.sql +++ b/swh/storage/sql/60-swh-indexes.sql @@ -130,6 +130,17 @@ alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid; alter table origin_visit validate constraint origin_visit_origin_fkey; +-- origin_visit_status + +create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date); +alter table origin_visit_status add primary key using index origin_visit_status_pkey; + +alter table origin_visit_status + add constraint origin_visit_status_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey; + -- release create unique index concurrently release_pkey on release(id); alter table release add primary key using index release_pkey; diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -23,6 +23,7 @@ Directory, Origin, OriginVisit, + OriginVisitStatus, Revision, Release, SkippedContent, @@ -843,6 +844,7 @@ with convert_validation_exceptions(): visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) + status = "ongoing" # We can write to the journal only after inserting to the # DB, because we want the id of the visit visit = OriginVisit.from_dict( @@ -851,16 +853,42 @@ "date": date, "type": type, "visit": visit_id, - "status": "ongoing", + # TODO: Remove when we remove those fields from the model + "status": status, "metadata": None, "snapshot": None, } ) + + with convert_validation_exceptions(): + visit_status = OriginVisitStatus( + origin=origin_url, + visit=visit_id, + date=date, + status=status, + snapshot=None, + metadata=None, + ) + self._origin_visit_status_add(visit_status, db=db, cur=cur) + self.journal_writer.origin_visit_add(visit) send_metric("origin_visit:add", count=1, method_name="origin_visit") return visit + def _origin_visit_status_add( + self, origin_visit_status: OriginVisitStatus, db, cur + ) -> None: + """Add an origin visit update""" + # Inject origin visit update in the schema + db.origin_visit_status_add(origin_visit_status, cur=cur) + + # write to the journal the origin visit update + + send_metric( + "origin_visit_status:add", count=1, method_name="origin_visit_status" + ) + @timed @db_transaction() def origin_visit_update( @@ -899,8 +927,66 @@ updated_visit = OriginVisit.from_dict({**visit, **updates}) self.journal_writer.origin_visit_update(updated_visit) + last_visit_status = self._origin_visit_get_updated( + origin, visit_id, db=db, cur=cur + ) + assert last_visit_status is not None + with convert_validation_exceptions(): - db.origin_visit_update(origin_url, visit_id, updates, cur) + visit_status = OriginVisitStatus( + origin=origin_url, + visit=visit_id, + date=date or now(), + status=status, + snapshot=snapshot or last_visit_status["snapshot"], + metadata=metadata or last_visit_status["metadata"], + ) + self._origin_visit_status_add(visit_status, db=db, cur=cur) + + def _origin_visit_get_updated( + self, origin: str, visit_id: int, db, cur + ) -> Optional[Dict[str, Any]]: + """Retrieve origin visit and latest origin visit update and merge them + into an origin visit. + + """ + row_visit = db.origin_visit_get(origin, visit_id) + if row_visit is None: + return None + visit = dict(zip(db.origin_visit_get_cols, row_visit)) + return self._origin_visit_apply_update(visit, db=db, cur=cur) + + def _origin_visit_apply_update( + self, visit: Dict[str, Any], db, cur=None + ) -> Dict[str, Any]: + """Retrieve the latest visit update information for the origin visit. + Then merge it with the visit and return it. + + """ + visit_status = db.origin_visit_status_get_latest( + visit["origin"], visit["visit"] + ) + return self._origin_visit_merge(visit, visit_status) + + def _origin_visit_merge( + self, visit: Dict[str, Any], visit_status: Dict[str, Any] + ) -> Dict[str, Any]: + """Merge origin_visit and origin_visit_status together. + + """ + return OriginVisit.from_dict( + { + # default to the values in visit + **visit, + # override with the last update + **visit_status, + # visit['origin'] is the URL (via a join), while + # visit_status['origin'] is only an id. + "origin": visit["origin"], + # but keep the date of the creation of the origin visit + "date": visit["date"], + } + ).to_dict() @timed @db_transaction() @@ -915,7 +1001,18 @@ for visit in visits: # TODO: upsert them all in a single query + assert visit.visit is not None db.origin_visit_upsert(visit, cur=cur) + with convert_validation_exceptions(): + visit_status = OriginVisitStatus( + origin=visit.origin, + visit=visit.visit, + date=now(), + status=visit.status, + snapshot=visit.snapshot, + metadata=visit.metadata, + ) + db.origin_visit_status_add(visit_status, cur=cur) @timed @db_transaction_generator(statement_timeout=500) @@ -927,20 +1024,21 @@ db=None, cur=None, ) -> Iterable[Dict[str, Any]]: - for line in db.origin_visit_get_all( + lines = db.origin_visit_get_all( origin, last_visit=last_visit, limit=limit, cur=cur - ): - data = dict(zip(db.origin_visit_get_cols, line)) - yield data + ) + for line in lines: + visit = dict(zip(db.origin_visit_get_cols, line)) + yield self._origin_visit_apply_update(visit, db) @timed @db_transaction(statement_timeout=500) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime, db=None, cur=None ) -> Optional[Dict[str, Any]]: - line = db.origin_visit_find_by_date(origin, visit_date, cur=cur) - if line: - return dict(zip(db.origin_visit_get_cols, line)) + visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) + if visit: + return self._origin_visit_apply_update(visit, db) return None @timed @@ -948,11 +1046,11 @@ def origin_visit_get_by( self, origin: str, visit: int, db=None, cur=None ) -> Optional[Dict[str, Any]]: - ori_visit = db.origin_visit_get(origin, visit, cur) - if not ori_visit: - return None - - return dict(zip(db.origin_visit_get_cols, ori_visit)) + row = db.origin_visit_get(origin, visit, cur) + if row: + visit_dict = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit_dict, db) + return None @timed @db_transaction(statement_timeout=4000) @@ -964,14 +1062,15 @@ db=None, cur=None, ) -> Optional[Dict[str, Any]]: - origin_visit = db.origin_visit_get_latest( + row = db.origin_visit_get_latest( origin, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, cur=cur, ) - if origin_visit: - return dict(zip(db.origin_visit_get_cols, origin_visit)) + if row: + visit = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit, db) return None @timed @@ -979,11 +1078,11 @@ def origin_visit_get_random( self, type: str, db=None, cur=None ) -> Optional[Dict[str, Any]]: - result = db.origin_visit_get_random(type, cur) - if result: - return dict(zip(db.origin_visit_get_cols, result)) - else: - return None + row = db.origin_visit_get_random(type, cur) + if row: + visit = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit, db) + return None @timed @db_transaction(statement_timeout=2000)