diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/147.sql @@ -0,0 +1,14 @@ +-- SWH DB schema upgrade +-- from_version: 146 +-- to_version: 147 +-- description: Add origin_visit_update table + +-- latest schema version +insert into dbversion(version, release, description) + values(147, now(), 'Work In Progress'); + +-- schema change +-- FIXME: Fill in the blank once settled in the schema change + +-- migration change +-- FIXME: Add a migration script from old data to new diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -827,7 +827,8 @@ def origin_visit_update( self, origin: str, visit_id: int, status: str, - metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): + metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, + date: Optional[datetime.datetime] = None): origin_url = origin # TODO: rename the argument # Get the existing data of the visit diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -6,10 +6,12 @@ import random import select +from typing import List + from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator -from swh.model.model import OriginVisit, SHA1_SIZE +from swh.model.model import OriginVisit, OriginVisitUpdate, SHA1_SIZE class Db(BaseDb): @@ -351,6 +353,25 @@ (origin, ts, type)) return cur.fetchone()[0] + origin_visit_update_cols = [ + 'origin', 'visit', 'date', 'status', 'snapshot', 'metadata'] + + def origin_visit_update_add( + self, visit_update: OriginVisitUpdate, cur=None) -> None: + assert self.origin_visit_update_cols[0] == 'origin' + assert self.origin_visit_update_cols[-1] == 'metadata' + cols = self.origin_visit_update_cols[1:-1] + cur = self._cursor(cur) + cur.execute( + f"WITH origin_id as (select id from origin where url=%s) " + f"INSERT INTO origin_visit_update " + f"(origin, {', '.join(cols)}, metadata) " + f"VALUES ((select id from origin_id), " + f"{', '.join(['%s']*len(cols))}, %s)", + [visit_update.origin] + + [getattr(visit_update, key) for key in cols] + + [jsonize(visit_update.metadata)]) + def origin_visit_update(self, origin_id, visit_id, updates, cur=None): """Update origin_visit's status.""" cur = self._cursor(cur) @@ -404,6 +425,21 @@ 'origin.url AS origin', 'visit', 'date', 'origin_visit.type AS type', 'status', 'metadata', 'snapshot'] + def origin_visit_update_get_latest( + self, origin: str, visit: int, cur=None) -> List: + """Given an origin visit id, return its latest origin_visit_update + + """ + cols = self.origin_visit_update_cols + cur = self._cursor(cur) + cur.execute(f"SELECT {', '.join(cols)} " + f"FROM origin_visit_update ovu " + f"INNER JOIN origin o on o.id=ovu.origin " + f"WHERE o.url=%s AND ovu.visit=%s" + f"ORDER BY ovu.date DESC LIMIT 1", + (origin, visit)) + return cur.fetchone() + def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): """Retrieve all visits for origin with id origin_id. diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -14,13 +14,13 @@ from collections import defaultdict from datetime import timedelta -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import attr from swh.model.model import ( BaseContent, Content, SkippedContent, Directory, Revision, - Release, Snapshot, OriginVisit, Origin, SHA1_SIZE + Release, Snapshot, OriginVisit, OriginVisitUpdate, Origin, SHA1_SIZE ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.storage.objstorage import ObjStorage @@ -29,6 +29,7 @@ from .converters import origin_url_to_sha1 from .utils import get_partition_bounds_bytes +from .validate import convert_validation_exceptions from .writer import JournalWriter # Max block size of contents to return @@ -58,6 +59,8 @@ self._origins_by_id = [] self._origins_by_sha1 = {} self._origin_visits = {} + self._origin_visit_updates: Dict[ + Tuple[str, int], List[OriginVisitUpdate]] = {} self._persons = [] self._origin_metadata = defaultdict(list) self._tools = {} @@ -500,7 +503,9 @@ if origin_url not in self._origins or \ visit > len(self._origin_visits[origin_url]): return None - snapshot_id = self._origin_visits[origin_url][visit-1].snapshot + + visit = self._origin_visit_get_updated(origin_url, visit) + snapshot_id = visit.snapshot if snapshot_id: return self.snapshot_get(snapshot_id) else: @@ -665,15 +670,18 @@ else: origins = [orig for orig in origins if url_pattern in orig['url']] if with_visit: - origins = [ - orig for orig in origins - if len(self._origin_visits[orig['url']]) > 0 and - set(ov.snapshot - for ov in self._origin_visits[orig['url']] - if ov.snapshot) & - set(self._snapshots)] + filtered_origins = [] + for orig in origins: + visits = (self._origin_visit_get_updated(ov.origin, ov.visit) + for ov in self._origin_visits[orig['url']]) + for ov in visits: + if ov.snapshot and ov.snapshot in self._snapshots: + filtered_origins.append(orig) + break + else: + filtered_origins = origins - return origins[offset:offset+limit] + return filtered_origins[offset:offset+limit] def origin_count(self, url_pattern, regexp=False, with_visit=False): return len(self.origin_search(url_pattern, regexp=regexp, @@ -722,19 +730,33 @@ # visit ids are in the range [1, +inf[ visit_id = len(self._origin_visits[origin_url]) + 1 status = 'ongoing' - visit = OriginVisit( - origin=origin_url, - date=date, - type=type, - status=status, - snapshot=None, - metadata=None, - visit=visit_id, - ) + with convert_validation_exceptions(): + visit = OriginVisit( + origin=origin_url, + date=date, + type=type, + # TODO: Remove when we remove those fields from the model + status=status, + snapshot=None, + metadata=None, + visit=visit_id, + ) self._origin_visits[origin_url].append(visit) - visit = visit - - self._objects[(origin_url, visit.visit)].append( + assert visit.visit is not None + visit_key = (origin_url, visit.visit) + + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date, + status=status, + snapshot=None, + metadata=None, + ) + self._origin_visit_updates[visit_key] = [visit_update] + + self._objects[visit_key].append( ('origin_visit', None)) self.journal_writer.origin_visit_add(visit) @@ -744,7 +766,8 @@ def origin_visit_update( self, origin: str, visit_id: int, status: str, - metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): + metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, + date: Optional[datetime.datetime] = None): origin_url = self._get_origin_url(origin) if origin_url is None: raise StorageArgumentException('Unknown origin.') @@ -755,43 +778,67 @@ raise StorageArgumentException( 'Unknown visit_id for this origin') from None - updates: Dict[str, Any] = { - 'status': status - } - if metadata: - updates['metadata'] = metadata - if snapshot: - updates['snapshot'] = snapshot + # Retrieve the previous visit update + assert visit.visit is not None + visit_key = (origin_url, visit.visit) - try: - visit = attr.evolve(visit, **updates) - except (KeyError, TypeError, ValueError) as e: - raise StorageArgumentException(*e.args) + last_visit_update = max( + self._origin_visit_updates[visit_key], key=lambda v: v.date) + + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date or now(), + status=status, + snapshot=snapshot or last_visit_update.snapshot, + metadata=metadata or last_visit_update.metadata, + ) + self._origin_visit_updates[visit_key].append(visit_update) - self.journal_writer.origin_visit_update(visit) + self.journal_writer.origin_visit_update( + self._origin_visit_get_updated(origin_url, visit_id)) self._origin_visits[origin_url][visit_id-1] = visit def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: self.journal_writer.origin_visit_upsert(visits) + date = now() + for visit in visits: - visit_id = visit.visit origin_url = visit.origin + origin = self.origin_get({'url': origin_url}) - try: - visit = attr.evolve(visit, origin=origin_url) - except (KeyError, TypeError, ValueError) as e: - raise StorageArgumentException(*e.args) - - self._objects[(origin_url, visit_id)].append( - ('origin_visit', None)) - - if visit_id: - while len(self._origin_visits[origin_url]) <= visit_id: + if not origin: # Cannot add a visit without an origin + raise StorageArgumentException( + 'Unknown origin %s', origin_url) + + if origin_url in self._origins: + origin = self._origins[origin_url] + # visit ids are in the range [1, +inf[ + assert visit.visit is not None + visit_key = (origin_url, visit.visit) + + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit.visit, + date=date, + status=visit.status, + snapshot=visit.snapshot, + metadata=visit.metadata, + ) + + self._origin_visit_updates.setdefault(visit_key, []) + while len(self._origin_visits[origin_url]) <= visit.visit: self._origin_visits[origin_url].append(None) - self._origin_visits[origin_url][visit_id-1] = visit + self._origin_visits[origin_url][visit.visit-1] = visit + self._origin_visit_updates[visit_key].append(visit_update) + + self._objects[visit_key].append( + ('origin_visit', None)) def _convert_visit(self, visit): if visit is None: @@ -801,6 +848,29 @@ return visit + def _origin_visit_get_updated( + self, origin: str, visit_id: int) -> Optional[OriginVisit]: + """Merge origin visit and latest origin visit update + + """ + assert visit_id >= 1 + visit = self._origin_visits[origin][visit_id-1] + if visit is None: + return None + visit_key = (origin, visit_id) + + visit_update = max( + self._origin_visit_updates[visit_key], key=lambda v: v.date) + + return OriginVisit.from_dict({ + # default to the values in visit + **visit.to_dict(), + # override with the last update + **visit_update.to_dict(), + # but keep the date of the creation of the origin visit + 'date': visit.date + }) + def origin_visit_get(self, origin, last_visit=None, limit=None): origin_url = self._get_origin_url(origin) if origin_url in self._origin_visits: @@ -814,8 +884,8 @@ continue visit_id = visit.visit - yield self._convert_visit( - self._origin_visits[origin_url][visit_id-1]) + yield self._origin_visit_get_updated( + origin_url, visit_id).to_dict() def origin_visit_find_by_date(self, origin, visit_date): origin_url = self._get_origin_url(origin) @@ -824,14 +894,15 @@ visit = min( visits, key=lambda v: (abs(v.date - visit_date), -v.visit)) - return self._convert_visit(visit) + return self._origin_visit_get_updated( + origin, visit.visit).to_dict() def origin_visit_get_by(self, origin, visit): origin_url = self._get_origin_url(origin) if origin_url in self._origin_visits and \ visit <= len(self._origin_visits[origin_url]): - return self._convert_visit( - self._origin_visits[origin_url][visit-1]) + return self._origin_visit_get_updated( + origin_url, visit).to_dict() def origin_visit_get_latest( self, origin, allowed_statuses=None, require_snapshot=False): @@ -839,6 +910,9 @@ if not origin: return visits = self._origin_visits[origin.url] + visits = [self._origin_visit_get_updated(visit.origin, visit.visit) + for visit in visits + if visit is not None] if allowed_statuses is not None: visits = [visit for visit in visits if visit.status in allowed_statuses] @@ -848,7 +922,9 @@ visit = max( visits, key=lambda v: (v.date, v.visit), default=None) - return self._convert_visit(visit) + if visit is None: + return None + return visit.to_dict() def _select_random_origin_visit_by_type(self, type: str) -> str: while True: @@ -864,8 +940,12 @@ back_in_the_day = now() - timedelta(weeks=12) # 3 months back # This should be enough for tests for visit in random_origin_visits: - if visit.date > back_in_the_day and visit.status == 'full': - return visit.to_dict() + updated_visit = self._origin_visit_get_updated( + url, visit.visit) + assert updated_visit is not None + if updated_visit.date > back_in_the_day \ + and updated_visit.status == 'full': + return updated_visit.to_dict() else: return None diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -795,7 +795,8 @@ @remote_api_endpoint('origin/visit/update') def origin_visit_update( self, origin: str, visit_id: int, status: str, - metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): + metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, + date: Optional[datetime.datetime] = None): """Update an origin_visit's status. Args: @@ -805,6 +806,7 @@ metadata: Data associated to the visit snapshot (sha1_git): identifier of the snapshot to add to the visit + date: Update date Returns: None diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(146, now(), 'Work In Progress'); + values(147, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -282,6 +282,7 @@ visit bigint not null, date timestamptz not null, type text not null, + -- remove those when done migrating the schema status origin_visit_status not null, metadata jsonb, snapshot sha1_git @@ -296,6 +297,28 @@ comment on column origin_visit.snapshot is 'Origin snapshot at visit time'; +-- Crawling history of software origin visits by Software Heritage. Each +-- visit see its history change through new origin visit updates +create table origin_visit_update +( + id bigserial not null, -- TODO: Decide if we keep that or not + origin bigint not null, + visit bigint not null, + date timestamptz not null, + status origin_visit_status not null, + metadata jsonb, + snapshot sha1_git +); + +comment on column origin_visit_update.id is 'visit update id'; +comment on column origin_visit_update.origin is 'origin concerned by the visit update'; +comment on column origin_visit_update.visit is 'visit concerned by the visit update'; +comment on column origin_visit_update.date is 'Visit update timestamp'; +comment on column origin_visit_update.status is 'Visit update status'; +comment on column origin_visit_update.metadata is 'Origin metadata at visit update time'; +comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time'; + + -- A snapshot represents the entire state of a software origin as crawled by -- Software Heritage. This table is a simple mapping between (public) intrinsic -- snapshot identifiers and (private) numeric sequential identifiers. diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql --- a/swh/storage/sql/60-swh-indexes.sql +++ b/swh/storage/sql/60-swh-indexes.sql @@ -130,6 +130,17 @@ alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid; alter table origin_visit validate constraint origin_visit_origin_fkey; +-- origin_visit_update + +create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date); +alter table origin_visit_update add primary key using index origin_visit_update_pkey; + +alter table origin_visit_update + add constraint origin_visit_update_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; +alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey; + -- release create unique index concurrently release_pkey on release(id); alter table release add primary key using index release_pkey; diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -19,7 +19,7 @@ import psycopg2.errors from swh.model.model import ( - Content, Directory, Origin, OriginVisit, + Content, Directory, Origin, OriginVisit, OriginVisitUpdate, Revision, Release, SkippedContent, Snapshot, SHA1_SIZE ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex @@ -809,6 +809,7 @@ with convert_validation_exceptions(): visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) + status = 'ongoing' # We can write to the journal only after inserting to the # DB, because we want the id of the visit visit = OriginVisit.from_dict({ @@ -816,21 +817,46 @@ 'date': date, 'type': type, 'visit': visit_id, - 'status': 'ongoing', + # TODO: Remove when we remove those fields from the model + 'status': status, 'metadata': None, 'snapshot': None }) + + with convert_validation_exceptions(): + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date, + status=status, + snapshot=None, + metadata=None, + ) + self._origin_visit_update_add(visit_update, db=db, cur=cur) + self.journal_writer.origin_visit_add(visit) send_metric('origin_visit:add', count=1, method_name='origin_visit') return visit + def _origin_visit_update_add(self, origin_visit_update: OriginVisitUpdate, + db, cur) -> None: + """Add an origin visit update""" + # Inject origin visit update in the schema + db.origin_visit_update_add(origin_visit_update, cur=cur) + + # write to the journal the origin visit update + + send_metric('origin_visit_update:add', + count=1, method_name='origin_visit_update') + @timed @db_transaction() def origin_visit_update(self, origin: str, visit_id: int, status: str, metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, + date: Optional[datetime.datetime] = None, db=None, cur=None): if not isinstance(origin, str): raise StorageArgumentException( @@ -858,6 +884,44 @@ with convert_validation_exceptions(): db.origin_visit_update(origin_url, visit_id, updates, cur) + last_visit_update = self._origin_visit_get_updated( + origin, visit_id, db=db, cur=cur) + assert last_visit_update is not None + + with convert_validation_exceptions(): + from swh.storage.in_memory import now + visit_update = OriginVisitUpdate( + origin=origin_url, + visit=visit_id, + date=date or now(), + status=status, + snapshot=snapshot or last_visit_update.snapshot, + metadata=metadata or last_visit_update.metadata, + ) + self._origin_visit_update_add(visit_update, db=db, cur=cur) + + def _origin_visit_get_updated( + self, origin: str, visit_id: int, + db, cur) -> Optional[OriginVisit]: + """Merge origin visit and latest origin visit update + + """ + row_visit = db.origin_visit_get(origin, visit_id) + if row_visit is None: + return None + visit = dict(zip(db.origin_visit_get_cols, row_visit)) + + row_visit_update = db.origin_visit_update_get_latest(origin, visit_id) + visit_update = dict(zip(db.origin_visit_update_cols, row_visit_update)) + return OriginVisit.from_dict({ + # default to the values in visit + **visit, + # override with the last update + **visit_update, + # but keep the date of the creation of the origin visit + 'date': visit['date'] + }) + @timed @db_transaction() def origin_visit_upsert(self, visits: Iterable[OriginVisit], diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -1680,8 +1680,8 @@ # given origin_url = swh_storage.origin_add_one(data.origin) origin_url2 = swh_storage.origin_add_one(data.origin2) - date_visit = datetime.datetime.now(datetime.timezone.utc) - date_visit2 = date_visit + datetime.timedelta(minutes=1) + date_visit = data.date_visit1 + date_visit2 = data.date_visit2 # Round to milliseconds before insertion, so equality doesn't fail # after a round-trip through a DB (eg. Cassandra)