Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696935
D2938.id10642.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
23 KB
Subscribers
None
D2938.id10642.diff
View Options
diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql
new file mode 100644
--- /dev/null
+++ b/sql/upgrades/147.sql
@@ -0,0 +1,51 @@
+-- SWH DB schema upgrade
+-- from_version: 146
+-- to_version: 147
+-- description: Add origin_visit_update table and migrate origin_visit
+-- to origin_visit_update
+
+-- latest schema version
+insert into dbversion(version, release, description)
+ values(147, now(), 'Work In Progress');
+
+-- schema change
+
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit updates
+create table origin_visit_update
+(
+ origin bigint not null,
+ visit bigint not null,
+ date timestamptz not null,
+ status origin_visit_status not null,
+ metadata jsonb,
+ snapshot sha1_git
+);
+
+comment on column origin_visit_update.origin is 'origin concerned by the visit update';
+comment on column origin_visit_update.visit is 'visit concerned by the visit update';
+comment on column origin_visit_update.date is 'Visit update timestamp';
+comment on column origin_visit_update.status is 'Visit update status';
+comment on column origin_visit_update.metadata is 'Origin metadata at visit update time';
+comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time';
+
+-- origin_visit_update
+
+create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date);
+alter table origin_visit_update add primary key using index origin_visit_update_pkey;
+
+alter table origin_visit_update
+ add constraint origin_visit_update_origin_visit_fkey
+ foreign key (origin, visit)
+ references origin_visit(origin, visit) not valid;
+alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey;
+
+
+-- data change
+
+-- best approximation of the visit update date is the origin_visit's date
+insert into origin_visit_update (origin, visit, date, status, metadata, snapshot)
+select origin, visit, date, status, metadata, snaspshot
+from origin_visit
+on conflict origin_visit_update(origin, visit, date)
+do nothing;
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -6,10 +6,12 @@
import random
import select
+from typing import Any, Dict, Optional, Tuple
+
from swh.core.db import BaseDb
from swh.core.db.db_utils import stored_procedure, jsonize
from swh.core.db.db_utils import execute_values_generator
-from swh.model.model import OriginVisit, SHA1_SIZE
+from swh.model.model import OriginVisit, OriginVisitUpdate, SHA1_SIZE
class Db(BaseDb):
@@ -258,9 +260,13 @@
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None):
cur = self._cursor(cur)
query = """\
- SELECT snapshot FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url=%s AND origin_visit.visit=%s;
+ SELECT ovu.snapshot
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_update ovu
+ ON ov.origin = ovu.origin AND ov.visit = ovu.visit
+ WHERE o.url=%s AND ov.visit=%s
+ ORDER BY ovu.date DESC LIMIT 1
"""
cur.execute(query, (origin_url, visit_id))
@@ -440,30 +446,33 @@
)
return cur.fetchone()[0]
- def origin_visit_update(self, origin_id, visit_id, updates, cur=None):
- """Update origin_visit's status."""
+ origin_visit_update_cols = [
+ "origin",
+ "visit",
+ "date",
+ "status",
+ "snapshot",
+ "metadata",
+ ]
+
+ def origin_visit_update_add(
+ self, visit_update: OriginVisitUpdate, cur=None
+ ) -> None:
+ assert self.origin_visit_update_cols[0] == "origin"
+ assert self.origin_visit_update_cols[-1] == "metadata"
+ cols = self.origin_visit_update_cols[1:-1]
cur = self._cursor(cur)
- update_cols = []
- values = []
- where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"]
- where_values = [origin_id, visit_id]
- if "status" in updates:
- update_cols.append("status=%s")
- values.append(updates.pop("status"))
- if "metadata" in updates:
- update_cols.append("metadata=%s")
- values.append(jsonize(updates.pop("metadata")))
- if "snapshot" in updates:
- update_cols.append("snapshot=%s")
- values.append(updates.pop("snapshot"))
- assert not updates, "Unknown fields: %r" % updates
- query = """UPDATE origin_visit
- SET {update_cols}
- FROM origin
- WHERE {where}""".format(
- **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)}
+ cur.execute(
+ f"WITH origin_id as (select id from origin where url=%s) "
+ f"INSERT INTO origin_visit_update "
+ f"(origin, {', '.join(cols)}, metadata) "
+ f"VALUES ((select id from origin_id), "
+ f"{', '.join(['%s']*len(cols))}, %s) "
+ f"ON CONFLICT (origin, visit, date) do nothing",
+ [visit_update.origin]
+ + [getattr(visit_update, key) for key in cols]
+ + [jsonize(visit_update.metadata)],
)
- cur.execute(query, (*values, *where_values))
def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None:
# doing an extra query like this is way simpler than trying to join
@@ -504,15 +513,42 @@
"snapshot",
]
origin_visit_select_cols = [
- "origin.url AS origin",
- "visit",
- "date",
- "origin_visit.type AS type",
- "status",
- "metadata",
- "snapshot",
+ "o.url AS origin",
+ "ov.visit",
+ "ov.date",
+ "ov.type AS type",
+ "ovu.status",
+ "ovu.metadata",
+ "ovu.snapshot",
]
+ def _make_origin_visit_update(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]:
+ """Make an origin_visit_update dict out of a row
+
+ """
+ if not row:
+ return None
+ return dict(zip(self.origin_visit_update_cols, row))
+
+ def origin_visit_update_get_latest(
+ self, origin: str, visit: int, cur=None
+ ) -> Optional[Dict[str, Any]]:
+ """Given an origin visit id, return its latest origin_visit_update
+
+ """
+ cols = self.origin_visit_update_cols
+ cur = self._cursor(cur)
+ cur.execute(
+ f"SELECT {', '.join(cols)} "
+ f"FROM origin_visit_update ovu "
+ f"INNER JOIN origin o on o.id=ovu.origin "
+ f"WHERE o.url=%s AND ovu.visit=%s"
+ f"ORDER BY ovu.date DESC LIMIT 1",
+ (origin, visit),
+ )
+ row = cur.fetchone()
+ return self._make_origin_visit_update(row)
+
def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None):
"""Retrieve all visits for origin with id origin_id.
@@ -520,25 +556,27 @@
origin_id: The occurrence's origin
Yields:
- The occurrence's history visits
+ The visits for that origin
"""
cur = self._cursor(cur)
if last_visit:
- extra_condition = "and visit > %s"
+ extra_condition = "and ov.visit > %s"
args = (origin_id, last_visit, limit)
else:
extra_condition = ""
args = (origin_id, limit)
query = """\
- SELECT %s
- FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url=%%s %s
- order by visit asc
- limit %%s""" % (
+ SELECT DISTINCT ON (ov.visit) %s
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_update ovu
+ ON ov.origin = ovu.origin AND ov.visit = ovu.visit
+ WHERE o.url=%%s %s
+ ORDER BY ov.visit ASC, ovu.date DESC
+ LIMIT %%s""" % (
", ".join(self.origin_visit_select_cols),
extra_condition,
)
@@ -562,9 +600,13 @@
query = """\
SELECT %s
- FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url = %%s AND visit = %%s
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_update ovu
+ ON ov.origin = ovu.origin AND ov.visit = ovu.visit
+ WHERE o.url = %%s AND ov.visit = %%s
+ ORDER BY ovu.date DESC
+ LIMIT 1
""" % (
", ".join(self.origin_visit_select_cols)
)
@@ -580,9 +622,11 @@
cur.execute(
"SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date)
)
- r = cur.fetchall()
- if r:
- return r[0]
+ rows = cur.fetchall()
+ if rows:
+ visit = dict(zip(self.origin_visit_get_cols, rows[0]))
+ visit["origin"] = origin
+ return visit
def origin_visit_exists(self, origin_id, visit_id, cur=None):
"""Check whether an origin visit with the given ids exists"""
@@ -613,21 +657,25 @@
query_parts = [
"SELECT %s" % ", ".join(self.origin_visit_select_cols),
- "FROM origin_visit",
- "INNER JOIN origin ON origin.id = origin_visit.origin",
+ "FROM origin_visit ov ",
+ "INNER JOIN origin o ON o.id = ov.origin",
+ "INNER JOIN origin_visit_update ovu ",
+ "ON o.id = ovu.origin AND ov.visit = ovu.visit ",
]
- query_parts.append("WHERE origin.url = %s")
+ query_parts.append("WHERE o.url = %s")
if require_snapshot:
- query_parts.append("AND snapshot is not null")
+ query_parts.append("AND ovu.snapshot is not null")
if allowed_statuses:
query_parts.append(
- cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode()
+ cur.mogrify("AND ovu.status IN %s", (tuple(allowed_statuses),)).decode()
)
- query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1")
+ query_parts.append(
+ "ORDER BY ov.date DESC, ov.visit DESC, ovu.date DESC LIMIT 1"
+ )
query = "\n".join(query_parts)
@@ -644,18 +692,15 @@
"""
cur = self._cursor(cur)
columns = ",".join(self.origin_visit_select_cols)
- query = f"""with visits as (
- select *
- from origin_visit
- where origin_visit.status='full' and
- origin_visit.type=%s and
- origin_visit.date > now() - '3 months'::interval
- )
- select {columns}
- from visits as origin_visit
- inner join origin
- on origin_visit.origin=origin.id
- where random() < 0.1
+ query = f"""select {columns}
+ from origin_visit ov
+ inner join origin o on ov.origin=o.id
+ inner join origin_visit_update ovu
+ on ov.origin = ovu.origin and ov.visit = ovu.visit
+ where ovu.status='full'
+ and ov.type=%s
+ and ov.date > now() - '3 months'::interval
+ and random() < 0.1
limit 1
"""
cur.execute(query, (type,))
@@ -889,15 +934,17 @@
origin_cols = ",".join(self.origin_cols)
query = """SELECT %s
- FROM origin
+ FROM origin o
WHERE """
if with_visit:
query += """
EXISTS (
SELECT 1
- FROM origin_visit
- INNER JOIN snapshot ON snapshot=snapshot.id
- WHERE origin=origin.id
+ FROM origin_visit ov
+ INNER JOIN origin_visit_update ovu
+ ON ov.origin = ovu.origin AND ov.visit = ovu.visit
+ INNER JOIN snapshot ON ovu.snapshot=snapshot.id
+ WHERE ov.origin=o.id
)
AND """
query += "url %s %%s "
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -17,7 +17,7 @@
-- latest schema version
insert into dbversion(version, release, description)
- values(146, now(), 'Work In Progress');
+ values(147, now(), 'Work In Progress');
-- a SHA1 checksum
create domain sha1 as bytea check (length(value) = 20);
@@ -282,6 +282,7 @@
visit bigint not null,
date timestamptz not null,
type text not null,
+ -- remove those when done migrating the schema
status origin_visit_status not null,
metadata jsonb,
snapshot sha1_git
@@ -296,6 +297,26 @@
comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit updates
+create table origin_visit_update
+(
+ origin bigint not null,
+ visit bigint not null,
+ date timestamptz not null,
+ status origin_visit_status not null,
+ metadata jsonb,
+ snapshot sha1_git
+);
+
+comment on column origin_visit_update.origin is 'origin concerned by the visit update';
+comment on column origin_visit_update.visit is 'visit concerned by the visit update';
+comment on column origin_visit_update.date is 'Visit update timestamp';
+comment on column origin_visit_update.status is 'Visit update status';
+comment on column origin_visit_update.metadata is 'Origin metadata at visit update time';
+comment on column origin_visit_update.snapshot is 'Origin snapshot at visit update time';
+
+
-- A snapshot represents the entire state of a software origin as crawled by
-- Software Heritage. This table is a simple mapping between (public) intrinsic
-- snapshot identifiers and (private) numeric sequential identifiers.
diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql
--- a/swh/storage/sql/60-swh-indexes.sql
+++ b/swh/storage/sql/60-swh-indexes.sql
@@ -130,6 +130,17 @@
alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid;
alter table origin_visit validate constraint origin_visit_origin_fkey;
+-- origin_visit_update
+
+create unique index concurrently origin_visit_update_pkey on origin_visit_update(origin, visit, date);
+alter table origin_visit_update add primary key using index origin_visit_update_pkey;
+
+alter table origin_visit_update
+ add constraint origin_visit_update_origin_visit_fkey
+ foreign key (origin, visit)
+ references origin_visit(origin, visit) not valid;
+alter table origin_visit_update validate constraint origin_visit_update_origin_visit_fkey;
+
-- release
create unique index concurrently release_pkey on release(id);
alter table release add primary key using index release_pkey;
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -23,6 +23,7 @@
Directory,
Origin,
OriginVisit,
+ OriginVisitUpdate,
Revision,
Release,
SkippedContent,
@@ -842,6 +843,7 @@
with convert_validation_exceptions():
visit_id = db.origin_visit_add(origin_url, date, type, cur=cur)
+ status = "ongoing"
# We can write to the journal only after inserting to the
# DB, because we want the id of the visit
visit = OriginVisit.from_dict(
@@ -850,16 +852,42 @@
"date": date,
"type": type,
"visit": visit_id,
- "status": "ongoing",
+ # TODO: Remove when we remove those fields from the model
+ "status": status,
"metadata": None,
"snapshot": None,
}
)
+
+ with convert_validation_exceptions():
+ visit_update = OriginVisitUpdate(
+ origin=origin_url,
+ visit=visit_id,
+ date=date,
+ status=status,
+ snapshot=None,
+ metadata=None,
+ )
+ self._origin_visit_update_add(visit_update, db=db, cur=cur)
+
self.journal_writer.origin_visit_add(visit)
send_metric("origin_visit:add", count=1, method_name="origin_visit")
return visit
+ def _origin_visit_update_add(
+ self, origin_visit_update: OriginVisitUpdate, db, cur
+ ) -> None:
+ """Add an origin visit update"""
+ # Inject origin visit update in the schema
+ db.origin_visit_update_add(origin_visit_update, cur=cur)
+
+ # write to the journal the origin visit update
+
+ send_metric(
+ "origin_visit_update:add", count=1, method_name="origin_visit_update"
+ )
+
@timed
@db_transaction()
def origin_visit_update(
@@ -898,8 +926,66 @@
updated_visit = OriginVisit.from_dict({**visit, **updates})
self.journal_writer.origin_visit_update(updated_visit)
+ last_visit_update = self._origin_visit_get_updated(
+ origin, visit_id, db=db, cur=cur
+ )
+ assert last_visit_update is not None
+
with convert_validation_exceptions():
- db.origin_visit_update(origin_url, visit_id, updates, cur)
+ visit_update = OriginVisitUpdate(
+ origin=origin_url,
+ visit=visit_id,
+ date=date or now(),
+ status=status,
+ snapshot=snapshot or last_visit_update["snapshot"],
+ metadata=metadata or last_visit_update["metadata"],
+ )
+ self._origin_visit_update_add(visit_update, db=db, cur=cur)
+
+ def _origin_visit_get_updated(
+ self, origin: str, visit_id: int, db, cur
+ ) -> Optional[Dict[str, Any]]:
+ """Retrieve origin visit and latest origin visit update and merge them
+ into an origin visit.
+
+ """
+ row_visit = db.origin_visit_get(origin, visit_id)
+ if row_visit is None:
+ return None
+ visit = dict(zip(db.origin_visit_get_cols, row_visit))
+ return self._origin_visit_apply_update(visit, db=db, cur=cur)
+
+ def _origin_visit_apply_update(
+ self, visit: Dict[str, Any], db, cur=None
+ ) -> Dict[str, Any]:
+ """Retrieve the latest visit update information for the origin visit.
+ Then merge it with the visit and return it.
+
+ """
+ visit_update = db.origin_visit_update_get_latest(
+ visit["origin"], visit["visit"]
+ )
+ return self._origin_visit_merge(visit, visit_update)
+
+ def _origin_visit_merge(
+ self, visit: Dict[str, Any], visit_update: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Merge origin_visit and origin_visit_update together.
+
+ """
+ return OriginVisit.from_dict(
+ {
+ # default to the values in visit
+ **visit,
+ # override with the last update
+ **visit_update,
+ # visit['origin'] is the URL (via a join), while
+ # visit_update['origin'] is only an id.
+ "origin": visit["origin"],
+ # but keep the date of the creation of the origin visit
+ "date": visit["date"],
+ }
+ ).to_dict()
@timed
@db_transaction()
@@ -914,7 +1000,18 @@
for visit in visits:
# TODO: upsert them all in a single query
+ assert visit.visit is not None
db.origin_visit_upsert(visit, cur=cur)
+ with convert_validation_exceptions():
+ visit_update = OriginVisitUpdate(
+ origin=visit.origin,
+ visit=visit.visit,
+ date=now(),
+ status=visit.status,
+ snapshot=visit.snapshot,
+ metadata=visit.metadata,
+ )
+ db.origin_visit_update_add(visit_update, cur=cur)
@timed
@db_transaction_generator(statement_timeout=500)
@@ -926,20 +1023,21 @@
db=None,
cur=None,
) -> Iterable[Dict[str, Any]]:
- for line in db.origin_visit_get_all(
+ lines = db.origin_visit_get_all(
origin, last_visit=last_visit, limit=limit, cur=cur
- ):
- data = dict(zip(db.origin_visit_get_cols, line))
- yield data
+ )
+ for line in lines:
+ visit = dict(zip(db.origin_visit_get_cols, line))
+ yield self._origin_visit_apply_update(visit, db)
@timed
@db_transaction(statement_timeout=500)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- line = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
- if line:
- return dict(zip(db.origin_visit_get_cols, line))
+ visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
+ if visit:
+ return self._origin_visit_apply_update(visit, db)
return None
@timed
@@ -947,11 +1045,11 @@
def origin_visit_get_by(
self, origin: str, visit: int, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- ori_visit = db.origin_visit_get(origin, visit, cur)
- if not ori_visit:
- return None
-
- return dict(zip(db.origin_visit_get_cols, ori_visit))
+ row = db.origin_visit_get(origin, visit, cur)
+ if row:
+ visit_dict = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit_dict, db)
+ return None
@timed
@db_transaction(statement_timeout=4000)
@@ -963,14 +1061,15 @@
db=None,
cur=None,
) -> Optional[Dict[str, Any]]:
- origin_visit = db.origin_visit_get_latest(
+ row = db.origin_visit_get_latest(
origin,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
cur=cur,
)
- if origin_visit:
- return dict(zip(db.origin_visit_get_cols, origin_visit))
+ if row:
+ visit = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit, db)
return None
@timed
@@ -978,11 +1077,11 @@
def origin_visit_get_random(
self, type: str, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- result = db.origin_visit_get_random(type, cur)
- if result:
- return dict(zip(db.origin_visit_get_cols, result))
- else:
- return None
+ row = db.origin_visit_get_random(type, cur)
+ if row:
+ visit = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit, db)
+ return None
@timed
@db_transaction(statement_timeout=2000)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 10:14 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222852
Attached To
D2938: pg-storage: Adapt internal implementations to use origin visit status model representation
Event Timeline
Log In to Comment