diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/147.sql @@ -0,0 +1,51 @@ +-- SWH DB schema upgrade +-- from_version: 146 +-- to_version: 147 +-- description: Add origin_visit_update table and migrate origin_visit +-- to origin_visit_update + +-- latest schema version +insert into dbversion(version, release, description) + values(147, now(), 'Work In Progress'); + +-- schema change + +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit updates +create table origin_visit_update +( + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_status not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_update.origin is 'origin concerned by the visit update'; +comment on column origin_visit_update.visit is 'visit concerned by the visit update'; +comment on column origin_visit_update.date is 'Visit update timestamp'; +comment on column origin_visit_update.status is 'Visit update status'; +comment on column origin_visit_update.metadata is 'Origin metadata at visit update time'; +comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time'; + +-- origin_visit_update + +create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date); +alter table origin_visit_update add primary key using index origin_visit_update_pkey; + +alter table origin_visit_update + add constraint origin_visit_update_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey; + + +-- data change + +-- best approximation of the visit update date is the origin_visit's date +insert into origin_visit_update (origin, visit, date, status, metadata, snapshot) +select origin, visit, date, status, metadata, snaspshot +from origin_visit +on conflict origin_visit_update(origin, visit, date) +do nothing; diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -6,10 +6,12 @@ import random import select +from typing import Any, Dict, Optional, Tuple + from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator -from swh.model.model import OriginVisit, SHA1_SIZE +from swh.model.model import OriginVisit, OriginVisitUpdate, SHA1_SIZE class Db(BaseDb): @@ -209,9 +211,13 @@ def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): cur = self._cursor(cur) query = """\ - SELECT snapshot FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url=%s AND origin_visit.visit=%s; + SELECT ovu.snapshot + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + WHERE o.url=%s AND ov.visit=%s + ORDER BY ovu.date DESC LIMIT 1 """ cur.execute(query, (origin_url, visit_id)) @@ -351,33 +357,25 @@ (origin, ts, type)) return cur.fetchone()[0] - def origin_visit_update(self, origin_id, visit_id, updates, cur=None): - """Update origin_visit's status.""" - cur = self._cursor(cur) - update_cols = [] - values = [] - where = ['origin.id = origin_visit.origin', - 'origin.url=%s', - 'visit=%s'] - where_values = [origin_id, visit_id] - if 'status' in updates: - update_cols.append('status=%s') - values.append(updates.pop('status')) - if 'metadata' in updates: - update_cols.append('metadata=%s') - values.append(jsonize(updates.pop('metadata'))) - if 'snapshot' in updates: - update_cols.append('snapshot=%s') - values.append(updates.pop('snapshot')) - assert not updates, 'Unknown fields: %r' % updates - query = """UPDATE origin_visit - SET {update_cols} - FROM origin - WHERE {where}""".format(**{ - 'update_cols': ', '.join(update_cols), - 'where': ' AND '.join(where) - }) - cur.execute(query, (*values, *where_values)) + origin_visit_update_cols = [ + 'origin', 'visit', 'date', 'status', 'snapshot', 'metadata'] + + def origin_visit_update_add( + self, visit_update: OriginVisitUpdate, cur=None) -> None: + assert self.origin_visit_update_cols[0] == 'origin' + assert self.origin_visit_update_cols[-1] == 'metadata' + cols = self.origin_visit_update_cols[1:-1] + cur = self._cursor(cur) + cur.execute( + f"WITH origin_id as (select id from origin where url=%s) " + f"INSERT INTO origin_visit_update " + f"(origin, {', '.join(cols)}, metadata) " + f"VALUES ((select id from origin_id), " + f"{', '.join(['%s']*len(cols))}, %s) " + f"ON CONFLICT (origin, visit, date) do nothing", + [visit_update.origin] + + [getattr(visit_update, key) for key in cols] + + [jsonize(visit_update.metadata)]) def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None: # doing an extra query like this is way simpler than trying to join @@ -401,8 +399,34 @@ 'origin', 'visit', 'date', 'type', 'status', 'metadata', 'snapshot'] origin_visit_select_cols = [ - 'origin.url AS origin', 'visit', 'date', 'origin_visit.type AS type', - 'status', 'metadata', 'snapshot'] + 'o.url AS origin', 'ov.visit', 'ov.date', 'ov.type AS type', + 'ovu.status', 'ovu.metadata', 'ovu.snapshot'] + + def _make_origin_visit_update( + self, row: Tuple[Any]) -> Optional[Dict[str, Any]]: + """Make an origin_visit_update dict out of a row + + """ + if not row: + return None + return dict(zip(self.origin_visit_update_cols, row)) + + def origin_visit_update_get_latest( + self, origin: str, visit: int, + cur=None) -> Optional[Dict[str, Any]]: + """Given an origin visit id, return its latest origin_visit_update + + """ + cols = self.origin_visit_update_cols + cur = self._cursor(cur) + cur.execute(f"SELECT {', '.join(cols)} " + f"FROM origin_visit_update ovu " + f"INNER JOIN origin o on o.id=ovu.origin " + f"WHERE o.url=%s AND ovu.visit=%s" + f"ORDER BY ovu.date DESC LIMIT 1", + (origin, visit)) + row = cur.fetchone() + return self._make_origin_visit_update(row) def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): @@ -412,25 +436,27 @@ origin_id: The occurrence's origin Yields: - The occurrence's history visits + The visits for that origin """ cur = self._cursor(cur) if last_visit: - extra_condition = 'and visit > %s' + extra_condition = 'and ov.visit > %s' args = (origin_id, last_visit, limit) else: extra_condition = '' args = (origin_id, limit) query = """\ - SELECT %s - FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url=%%s %s - order by visit asc - limit %%s""" % ( + SELECT DISTINCT ON (ov.visit) %s + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + WHERE o.url=%%s %s + ORDER BY ov.visit ASC, ovu.date DESC + LIMIT %%s""" % ( ', '.join(self.origin_visit_select_cols), extra_condition ) @@ -453,9 +479,13 @@ query = """\ SELECT %s - FROM origin_visit - INNER JOIN origin ON origin.id = origin_visit.origin - WHERE origin.url = %%s AND visit = %%s + FROM origin_visit ov + INNER JOIN origin o ON o.id = ov.origin + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + WHERE o.url = %%s AND ov.visit = %%s + ORDER BY ovu.date DESC + LIMIT 1 """ % (', '.join(self.origin_visit_select_cols)) cur.execute(query, (origin_id, visit_id)) @@ -469,9 +499,11 @@ cur.execute( 'SELECT * FROM swh_visit_find_by_date(%s, %s)', (origin, visit_date)) - r = cur.fetchall() - if r: - return r[0] + rows = cur.fetchall() + if rows: + visit = dict(zip(self.origin_visit_get_cols, rows[0])) + visit['origin'] = origin + return visit def origin_visit_exists(self, origin_id, visit_id, cur=None): """Check whether an origin visit with the given ids exists""" @@ -502,20 +534,24 @@ query_parts = [ 'SELECT %s' % ', '.join(self.origin_visit_select_cols), - 'FROM origin_visit', - 'INNER JOIN origin ON origin.id = origin_visit.origin'] + 'FROM origin_visit ov ', + 'INNER JOIN origin o ON o.id = ov.origin', + 'INNER JOIN origin_visit_update ovu ', + 'ON o.id = ovu.origin AND ov.visit = ovu.visit ', + ] - query_parts.append('WHERE origin.url = %s') + query_parts.append('WHERE o.url = %s') if require_snapshot: - query_parts.append('AND snapshot is not null') + query_parts.append('AND ovu.snapshot is not null') if allowed_statuses: query_parts.append( - cur.mogrify('AND status IN %s', + cur.mogrify('AND ovu.status IN %s', (tuple(allowed_statuses),)).decode()) - query_parts.append('ORDER BY date DESC, visit DESC LIMIT 1') + query_parts.append( + 'ORDER BY ov.date DESC, ov.visit DESC, ovu.date DESC LIMIT 1') query = '\n'.join(query_parts) @@ -532,18 +568,15 @@ """ cur = self._cursor(cur) columns = ','.join(self.origin_visit_select_cols) - query = f"""with visits as ( - select * - from origin_visit - where origin_visit.status='full' and - origin_visit.type=%s and - origin_visit.date > now() - '3 months'::interval - ) - select {columns} - from visits as origin_visit - inner join origin - on origin_visit.origin=origin.id - where random() < 0.1 + query = f"""select {columns} + from origin_visit ov + inner join origin o on ov.origin=o.id + inner join origin_visit_update ovu + on ov.origin = ovu.origin and ov.visit = ovu.visit + where ovu.status='full' + and ov.type=%s + and ov.date > now() - '3 months'::interval + and random() < 0.1 limit 1 """ cur.execute(query, (type, )) @@ -753,15 +786,17 @@ origin_cols = ','.join(self.origin_cols) query = """SELECT %s - FROM origin + FROM origin o WHERE """ if with_visit: query += """ EXISTS ( SELECT 1 - FROM origin_visit - INNER JOIN snapshot ON snapshot=snapshot.id - WHERE origin=origin.id + FROM origin_visit ov + INNER JOIN origin_visit_update ovu + ON ov.origin = ovu.origin AND ov.visit = ovu.visit + INNER JOIN snapshot ON ovu.snapshot=snapshot.id + WHERE ov.origin=o.id ) AND """ query += 'url %s %%s ' diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -841,7 +841,11 @@ self._objects[visit_key].append( ('origin_visit', None)) - def _origin_visit_get_updated(self, origin: str, visit_id: int): + def _origin_visit_get_updated( + self, origin: str, visit_id: int) -> Optional[OriginVisit]: + """Merge origin visit and latest origin visit update + + """ assert visit_id >= 1 visit = self._origin_visits[origin][visit_id-1] if visit is None: @@ -946,6 +950,7 @@ for visit in random_origin_visits: updated_visit = self._origin_visit_get_updated( url, visit.visit) + assert updated_visit is not None if updated_visit.date > back_in_the_day \ and updated_visit.status == 'full': return updated_visit.to_dict() diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(146, now(), 'Work In Progress'); + values(147, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -282,6 +282,7 @@ visit bigint not null, date timestamptz not null, type text not null, + -- remove those when done migrating the schema status origin_visit_status not null, metadata jsonb, snapshot sha1_git @@ -296,6 +297,26 @@ comment on column origin_visit.snapshot is 'Origin snapshot at visit time'; +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit updates +create table origin_visit_update +( + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_status not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_update.origin is 'origin concerned by the visit update'; +comment on column origin_visit_update.visit is 'visit concerned by the visit update'; +comment on column origin_visit_update.date is 'Visit update timestamp'; +comment on column origin_visit_update.status is 'Visit update status'; +comment on column origin_visit_update.metadata is 'Origin metadata at visit update time'; +comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time'; + + -- A snapshot represents the entire state of a software origin as crawled by -- Software Heritage. This table is a simple mapping between (public) intrinsic -- snapshot identifiers and (private) numeric sequential identifiers. diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql --- a/swh/storage/sql/60-swh-indexes.sql +++ b/swh/storage/sql/60-swh-indexes.sql @@ -130,6 +130,17 @@ alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid; alter table origin_visit validate constraint origin_visit_origin_fkey; +-- origin_visit_update + +create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date); +alter table origin_visit_update add primary key using index origin_visit_update_pkey; + +alter table origin_visit_update + add constraint origin_visit_update_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey; + -- release create unique index concurrently release_pkey on release(id); alter table release add primary key using index release_pkey; diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -19,7 +19,7 @@ import psycopg2.errors from swh.model.model import ( - Content, Directory, Origin, OriginVisit, + Content, Directory, Origin, OriginVisit, OriginVisitUpdate, Revision, Release, SkippedContent, Snapshot, SHA1_SIZE ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex @@ -807,6 +807,7 @@ with convert_validation_exceptions(): visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) + status = 'ongoing' # We can write to the journal only after inserting to the # DB, because we want the id of the visit visit = OriginVisit.from_dict({ @@ -814,15 +815,39 @@ 'date': date, 'type': type, 'visit': visit_id, - 'status': 'ongoing', + # TODO: Remove when we remove those fields from the model + 'status': status, 'metadata': None, 'snapshot': None }) + + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date, + status=status, + snapshot=None, + metadata=None, + ) + self._origin_visit_update_add(visit_update, db=db, cur=cur) + self.journal_writer.origin_visit_add(visit) send_metric('origin_visit:add', count=1, method_name='origin_visit') return visit + def _origin_visit_update_add(self, origin_visit_update: OriginVisitUpdate, + db, cur) -> None: + """Add an origin visit update""" + # Inject origin visit update in the schema + db.origin_visit_update_add(origin_visit_update, cur=cur) + + # write to the journal the origin visit update + + send_metric('origin_visit_update:add', + count=1, method_name='origin_visit_update') + @timed @db_transaction() def origin_visit_update(self, origin: str, visit_id: int, @@ -854,8 +879,61 @@ updated_visit = {**visit, **updates} self.journal_writer.origin_visit_update(updated_visit) + last_visit_update = self._origin_visit_get_updated( + origin, visit_id, db=db, cur=cur) + assert last_visit_update is not None + with convert_validation_exceptions(): - db.origin_visit_update(origin_url, visit_id, updates, cur) + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date or now(), + status=status, + snapshot=snapshot or last_visit_update['snapshot'], + metadata=metadata or last_visit_update['metadata'], + ) + self._origin_visit_update_add(visit_update, db=db, cur=cur) + + def _origin_visit_get_updated( + self, origin: str, visit_id: int, + db, cur) -> Optional[Dict[str, Any]]: + """Retrieve origin visit and latest origin visit update and merge them + into an origin visit. + + """ + row_visit = db.origin_visit_get(origin, visit_id) + if row_visit is None: + return None + visit = dict(zip(db.origin_visit_get_cols, row_visit)) + return self._origin_visit_apply_update(visit, db=db, cur=cur) + + def _origin_visit_apply_update( + self, visit: Dict[str, Any], db, cur=None) -> Dict[str, Any]: + """Retrieve the latest visit update information for the origin visit. + Then merge it with the visit and return it. + + """ + visit_update = db.origin_visit_update_get_latest( + visit['origin'], visit['visit']) + return self._origin_visit_merge(visit, visit_update) + + def _origin_visit_merge( + self, visit: Dict[str, Any], + visit_update: Dict[str, Any]) -> Dict[str, Any]: + """Merge origin_visit and origin_visit_update together. + + """ + return OriginVisit.from_dict({ + # default to the values in visit + **visit, + # override with the last update + **visit_update, + # visit['origin'] is the URL (via a join), while + # visit_update['origin'] is only an id. + 'origin': visit['origin'], + # but keep the date of the creation of the origin visit + 'date': visit['date'] + }).to_dict() @timed @db_transaction() @@ -870,7 +948,18 @@ for visit in visits: # TODO: upsert them all in a single query + assert visit.visit is not None db.origin_visit_upsert(visit, cur=cur) + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=visit.origin, + visit=visit.visit, + date=now(), + status=visit.status, + snapshot=visit.snapshot, + metadata=visit.metadata, + ) + db.origin_visit_update_add(visit_update, cur=cur) @timed @db_transaction_generator(statement_timeout=500) @@ -878,19 +967,20 @@ self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None, db=None, cur=None) -> Iterable[Dict[str, Any]]: - for line in db.origin_visit_get_all( - origin, last_visit=last_visit, limit=limit, cur=cur): - data = dict(zip(db.origin_visit_get_cols, line)) - yield data + lines = db.origin_visit_get_all( + origin, last_visit=last_visit, limit=limit, cur=cur) + for line in lines: + visit = dict(zip(db.origin_visit_get_cols, line)) + yield self._origin_visit_apply_update(visit, db) @timed @db_transaction(statement_timeout=500) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime, db=None, cur=None) -> Optional[Dict[str, Any]]: - line = db.origin_visit_find_by_date(origin, visit_date, cur=cur) - if line: - return dict(zip(db.origin_visit_get_cols, line)) + visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) + if visit: + return self._origin_visit_apply_update(visit, db) return None @timed @@ -898,11 +988,11 @@ def origin_visit_get_by( self, origin: str, visit: int, db=None, cur=None) -> Optional[Dict[str, Any]]: - ori_visit = db.origin_visit_get(origin, visit, cur) - if not ori_visit: - return None - - return dict(zip(db.origin_visit_get_cols, ori_visit)) + row = db.origin_visit_get(origin, visit, cur) + if row: + visit_dict = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit_dict, db) + return None @timed @db_transaction(statement_timeout=4000) @@ -910,22 +1000,23 @@ self, origin: str, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, db=None, cur=None) -> Optional[Dict[str, Any]]: - origin_visit = db.origin_visit_get_latest( + row = db.origin_visit_get_latest( origin, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, cur=cur) - if origin_visit: - return dict(zip(db.origin_visit_get_cols, origin_visit)) + if row: + visit = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit, db) return None @timed @db_transaction() def origin_visit_get_random( self, type: str, db=None, cur=None) -> Optional[Dict[str, Any]]: - result = db.origin_visit_get_random(type, cur) - if result: - return dict(zip(db.origin_visit_get_cols, result)) - else: - return None + row = db.origin_visit_get_random(type, cur) + if row: + visit = dict(zip(db.origin_visit_get_cols, row)) + return self._origin_visit_apply_update(visit, db) + return None @timed @db_transaction(statement_timeout=2000)