Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9311791
D2938.id10934.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
25 KB
Subscribers
None
D2938.id10934.diff
View Options
diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql
new file mode 100644
--- /dev/null
+++ b/sql/upgrades/147.sql
@@ -0,0 +1,64 @@
+-- SWH DB schema upgrade
+-- from_version: 146
+-- to_version: 147
+-- description: Add origin_visit_status table
+-- 1. Rename enum origin_visit_status to origin_visit_state
+-- 2. Add new origin_visit_status table
+-- 3. Migrate origin_visit data to new origin_visit_status data
+
+-- latest schema version
+insert into dbversion(version, release, description)
+ values(147, now(), 'Work In Progress');
+
+-- schema change
+
+-- Rename old enum
+alter type origin_visit_status rename to origin_visit_state;
+comment on type origin_visit_state IS 'Possible visit status';
+
+-- Update origin visit comment on deprecated columns
+comment on column origin_visit.status is '(Deprecated) Visit status';
+comment on column origin_visit.metadata is '(Deprecated) Optional origin visit metadata';
+comment on column origin_visit.snapshot is '(Deprecated) Optional snapshot of the origin visit. It can be partial.';
+
+
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit status updates
+create table origin_visit_status
+(
+ origin bigint not null,
+ visit bigint not null,
+ date timestamptz not null,
+ status origin_visit_state not null,
+ metadata jsonb,
+ snapshot sha1_git
+);
+
+comment on column origin_visit_status.origin is 'Origin concerned by the visit update';
+comment on column origin_visit_status.visit is 'Visit concerned by the visit update';
+comment on column origin_visit_status.date is 'Visit update timestamp';
+comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)';
+comment on column origin_visit_status.metadata is 'Optional origin visit metadata';
+comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit.';
+
+
+-- origin_visit_status
+
+create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date);
+alter table origin_visit_status add primary key using index origin_visit_status_pkey;
+
+alter table origin_visit_status
+ add constraint origin_visit_status_origin_visit_fkey
+ foreign key (origin, visit)
+ references origin_visit(origin, visit) not valid;
+alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey;
+
+
+-- data change
+
+-- best approximation of the visit update date is the origin_visit's date
+insert into origin_visit_status (origin, visit, date, status, metadata, snapshot)
+select origin, visit, date, status, metadata, snapshot
+from origin_visit
+on conflict origin_visit_status(origin, visit, date)
+do nothing;
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -6,10 +6,12 @@
import random
import select
+from typing import Any, Dict, Optional, Tuple
+
from swh.core.db import BaseDb
from swh.core.db.db_utils import stored_procedure, jsonize
from swh.core.db.db_utils import execute_values_generator
-from swh.model.model import OriginVisit, SHA1_SIZE
+from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE
class Db(BaseDb):
@@ -258,9 +260,13 @@
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None):
cur = self._cursor(cur)
query = """\
- SELECT snapshot FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url=%s AND origin_visit.visit=%s;
+ SELECT ovs.snapshot
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ WHERE o.url=%s AND ov.visit=%s
+ ORDER BY ovs.date DESC LIMIT 1
"""
cur.execute(query, (origin_url, visit_id))
@@ -440,30 +446,33 @@
)
return cur.fetchone()[0]
- def origin_visit_update(self, origin_id, visit_id, updates, cur=None):
- """Update origin_visit's status."""
+ origin_visit_status_cols = [
+ "origin",
+ "visit",
+ "date",
+ "status",
+ "snapshot",
+ "metadata",
+ ]
+
+ def origin_visit_status_add(
+ self, visit_status: OriginVisitStatus, cur=None
+ ) -> None:
+ assert self.origin_visit_status_cols[0] == "origin"
+ assert self.origin_visit_status_cols[-1] == "metadata"
+ cols = self.origin_visit_status_cols[1:-1]
cur = self._cursor(cur)
- update_cols = []
- values = []
- where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"]
- where_values = [origin_id, visit_id]
- if "status" in updates:
- update_cols.append("status=%s")
- values.append(updates.pop("status"))
- if "metadata" in updates:
- update_cols.append("metadata=%s")
- values.append(jsonize(updates.pop("metadata")))
- if "snapshot" in updates:
- update_cols.append("snapshot=%s")
- values.append(updates.pop("snapshot"))
- assert not updates, "Unknown fields: %r" % updates
- query = """UPDATE origin_visit
- SET {update_cols}
- FROM origin
- WHERE {where}""".format(
- **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)}
+ cur.execute(
+ f"WITH origin_id as (select id from origin where url=%s) "
+ f"INSERT INTO origin_visit_status "
+ f"(origin, {', '.join(cols)}, metadata) "
+ f"VALUES ((select id from origin_id), "
+ f"{', '.join(['%s']*len(cols))}, %s) "
+ f"ON CONFLICT (origin, visit, date) do nothing",
+ [visit_status.origin]
+ + [getattr(visit_status, key) for key in cols]
+ + [jsonize(visit_status.metadata)],
)
- cur.execute(query, (*values, *where_values))
def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None:
# doing an extra query like this is way simpler than trying to join
@@ -504,15 +513,42 @@
"snapshot",
]
origin_visit_select_cols = [
- "origin.url AS origin",
- "visit",
- "date",
- "origin_visit.type AS type",
- "status",
- "metadata",
- "snapshot",
+ "o.url AS origin",
+ "ov.visit",
+ "ov.date",
+ "ov.type AS type",
+ "ovs.status",
+ "ovs.metadata",
+ "ovs.snapshot",
]
+ def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]:
+ """Make an origin_visit_status dict out of a row
+
+ """
+ if not row:
+ return None
+ return dict(zip(self.origin_visit_status_cols, row))
+
+ def origin_visit_status_get_latest(
+ self, origin: str, visit: int, cur=None
+ ) -> Optional[Dict[str, Any]]:
+ """Given an origin visit id, return its latest origin_visit_status
+
+ """
+ cols = self.origin_visit_status_cols
+ cur = self._cursor(cur)
+ cur.execute(
+ f"SELECT {', '.join(cols)} "
+ f"FROM origin_visit_status ovs "
+ f"INNER JOIN origin o on o.id=ovs.origin "
+ f"WHERE o.url=%s AND ovs.visit=%s"
+ f"ORDER BY ovs.date DESC LIMIT 1",
+ (origin, visit),
+ )
+ row = cur.fetchone()
+ return self._make_origin_visit_status(row)
+
def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None):
"""Retrieve all visits for origin with id origin_id.
@@ -520,25 +556,27 @@
origin_id: The occurrence's origin
Yields:
- The occurrence's history visits
+ The visits for that origin
"""
cur = self._cursor(cur)
if last_visit:
- extra_condition = "and visit > %s"
+ extra_condition = "and ov.visit > %s"
args = (origin_id, last_visit, limit)
else:
extra_condition = ""
args = (origin_id, limit)
query = """\
- SELECT %s
- FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url=%%s %s
- order by visit asc
- limit %%s""" % (
+ SELECT DISTINCT ON (ov.visit) %s
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ WHERE o.url=%%s %s
+ ORDER BY ov.visit ASC, ovs.date DESC
+ LIMIT %%s""" % (
", ".join(self.origin_visit_select_cols),
extra_condition,
)
@@ -562,9 +600,13 @@
query = """\
SELECT %s
- FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url = %%s AND visit = %%s
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ WHERE o.url = %%s AND ov.visit = %%s
+ ORDER BY ovs.date DESC
+ LIMIT 1
""" % (
", ".join(self.origin_visit_select_cols)
)
@@ -580,9 +622,11 @@
cur.execute(
"SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date)
)
- r = cur.fetchall()
- if r:
- return r[0]
+ rows = cur.fetchall()
+ if rows:
+ visit = dict(zip(self.origin_visit_get_cols, rows[0]))
+ visit["origin"] = origin
+ return visit
def origin_visit_exists(self, origin_id, visit_id, cur=None):
"""Check whether an origin visit with the given ids exists"""
@@ -613,21 +657,25 @@
query_parts = [
"SELECT %s" % ", ".join(self.origin_visit_select_cols),
- "FROM origin_visit",
- "INNER JOIN origin ON origin.id = origin_visit.origin",
+ "FROM origin_visit ov ",
+ "INNER JOIN origin o ON o.id = ov.origin",
+ "INNER JOIN origin_visit_status ovs ",
+ "ON o.id = ovs.origin AND ov.visit = ovs.visit ",
]
- query_parts.append("WHERE origin.url = %s")
+ query_parts.append("WHERE o.url = %s")
if require_snapshot:
- query_parts.append("AND snapshot is not null")
+ query_parts.append("AND ovs.snapshot is not null")
if allowed_statuses:
query_parts.append(
- cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode()
+ cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode()
)
- query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1")
+ query_parts.append(
+ "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1"
+ )
query = "\n".join(query_parts)
@@ -644,18 +692,15 @@
"""
cur = self._cursor(cur)
columns = ",".join(self.origin_visit_select_cols)
- query = f"""with visits as (
- select *
- from origin_visit
- where origin_visit.status='full' and
- origin_visit.type=%s and
- origin_visit.date > now() - '3 months'::interval
- )
- select {columns}
- from visits as origin_visit
- inner join origin
- on origin_visit.origin=origin.id
- where random() < 0.1
+ query = f"""select {columns}
+ from origin_visit ov
+ inner join origin o on ov.origin=o.id
+ inner join origin_visit_status ovs
+ on ov.origin = ovs.origin and ov.visit = ovs.visit
+ where ovs.status='full'
+ and ov.type=%s
+ and ov.date > now() - '3 months'::interval
+ and random() < 0.1
limit 1
"""
cur.execute(query, (type,))
@@ -889,15 +934,17 @@
origin_cols = ",".join(self.origin_cols)
query = """SELECT %s
- FROM origin
+ FROM origin o
WHERE """
if with_visit:
query += """
EXISTS (
SELECT 1
- FROM origin_visit
- INNER JOIN snapshot ON snapshot=snapshot.id
- WHERE origin=origin.id
+ FROM origin_visit ov
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ INNER JOIN snapshot ON ovs.snapshot=snapshot.id
+ WHERE ov.origin=o.id
)
AND """
query += "url %s %%s "
diff --git a/swh/storage/sql/20-swh-enums.sql b/swh/storage/sql/20-swh-enums.sql
--- a/swh/storage/sql/20-swh-enums.sql
+++ b/swh/storage/sql/20-swh-enums.sql
@@ -14,9 +14,9 @@
create type snapshot_target as enum ('content', 'directory', 'revision', 'release', 'snapshot', 'alias');
comment on type snapshot_target is 'Types of targets for snapshot branches';
-create type origin_visit_status as enum (
+create type origin_visit_state as enum (
'ongoing',
'full',
'partial'
);
-comment on type origin_visit_status IS 'Possible visit status';
+comment on type origin_visit_state IS 'Possible visit status';
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -17,7 +17,7 @@
-- latest schema version
insert into dbversion(version, release, description)
- values(146, now(), 'Work In Progress');
+ values(147, now(), 'Work In Progress');
-- a SHA1 checksum
create domain sha1 as bytea check (length(value) = 20);
@@ -282,7 +282,8 @@
visit bigint not null,
date timestamptz not null,
type text not null,
- status origin_visit_status not null,
+ -- remove those when done migrating the schema
+ status origin_visit_state not null,
metadata jsonb,
snapshot sha1_git
);
@@ -291,9 +292,29 @@
comment on column origin_visit.visit is 'Sequential visit number for the origin';
comment on column origin_visit.date is 'Visit timestamp';
comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)';
-comment on column origin_visit.status is 'Visit result';
-comment on column origin_visit.metadata is 'Origin metadata at visit time';
-comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
+comment on column origin_visit.status is '(Deprecated) Visit status';
+comment on column origin_visit.metadata is '(Deprecated) Optional origin visit metadata';
+comment on column origin_visit.snapshot is '(Deprecated) Optional snapshot of the origin visit. It can be partial.';
+
+
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit status updates
+create table origin_visit_status
+(
+ origin bigint not null,
+ visit bigint not null,
+ date timestamptz not null,
+ status origin_visit_state not null,
+ metadata jsonb,
+ snapshot sha1_git
+);
+
+comment on column origin_visit_status.origin is 'Origin concerned by the visit update';
+comment on column origin_visit_status.visit is 'Visit concerned by the visit update';
+comment on column origin_visit_status.date is 'Visit update timestamp';
+comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)';
+comment on column origin_visit_status.metadata is 'Optional origin visit metadata';
+comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.';
-- A snapshot represents the entire state of a software origin as crawled by
diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql
--- a/swh/storage/sql/60-swh-indexes.sql
+++ b/swh/storage/sql/60-swh-indexes.sql
@@ -130,6 +130,17 @@
alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid;
alter table origin_visit validate constraint origin_visit_origin_fkey;
+-- origin_visit_status
+
+create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date);
+alter table origin_visit_status add primary key using index origin_visit_status_pkey;
+
+alter table origin_visit_status
+ add constraint origin_visit_status_origin_visit_fkey
+ foreign key (origin, visit)
+ references origin_visit(origin, visit) not valid;
+alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey;
+
-- release
create unique index concurrently release_pkey on release(id);
alter table release add primary key using index release_pkey;
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -23,6 +23,7 @@
Directory,
Origin,
OriginVisit,
+ OriginVisitStatus,
Revision,
Release,
SkippedContent,
@@ -843,6 +844,7 @@
with convert_validation_exceptions():
visit_id = db.origin_visit_add(origin_url, date, type, cur=cur)
+ status = "ongoing"
# We can write to the journal only after inserting to the
# DB, because we want the id of the visit
visit = OriginVisit.from_dict(
@@ -851,16 +853,42 @@
"date": date,
"type": type,
"visit": visit_id,
- "status": "ongoing",
+ # TODO: Remove when we remove those fields from the model
+ "status": status,
"metadata": None,
"snapshot": None,
}
)
+
+ with convert_validation_exceptions():
+ visit_status = OriginVisitStatus(
+ origin=origin_url,
+ visit=visit_id,
+ date=date,
+ status=status,
+ snapshot=None,
+ metadata=None,
+ )
+ self._origin_visit_status_add(visit_status, db=db, cur=cur)
+
self.journal_writer.origin_visit_add(visit)
send_metric("origin_visit:add", count=1, method_name="origin_visit")
return visit
+ def _origin_visit_status_add(
+ self, origin_visit_status: OriginVisitStatus, db, cur
+ ) -> None:
+ """Add an origin visit update"""
+ # Inject origin visit update in the schema
+ db.origin_visit_status_add(origin_visit_status, cur=cur)
+
+ # write to the journal the origin visit update
+
+ send_metric(
+ "origin_visit_status:add", count=1, method_name="origin_visit_status"
+ )
+
@timed
@db_transaction()
def origin_visit_update(
@@ -899,8 +927,66 @@
updated_visit = OriginVisit.from_dict({**visit, **updates})
self.journal_writer.origin_visit_update(updated_visit)
+ last_visit_status = self._origin_visit_get_updated(
+ origin, visit_id, db=db, cur=cur
+ )
+ assert last_visit_status is not None
+
with convert_validation_exceptions():
- db.origin_visit_update(origin_url, visit_id, updates, cur)
+ visit_status = OriginVisitStatus(
+ origin=origin_url,
+ visit=visit_id,
+ date=date or now(),
+ status=status,
+ snapshot=snapshot or last_visit_status["snapshot"],
+ metadata=metadata or last_visit_status["metadata"],
+ )
+ self._origin_visit_status_add(visit_status, db=db, cur=cur)
+
+ def _origin_visit_get_updated(
+ self, origin: str, visit_id: int, db, cur
+ ) -> Optional[Dict[str, Any]]:
+ """Retrieve origin visit and latest origin visit update and merge them
+ into an origin visit.
+
+ """
+ row_visit = db.origin_visit_get(origin, visit_id)
+ if row_visit is None:
+ return None
+ visit = dict(zip(db.origin_visit_get_cols, row_visit))
+ return self._origin_visit_apply_update(visit, db=db, cur=cur)
+
+ def _origin_visit_apply_update(
+ self, visit: Dict[str, Any], db, cur=None
+ ) -> Dict[str, Any]:
+ """Retrieve the latest visit update information for the origin visit.
+ Then merge it with the visit and return it.
+
+ """
+ visit_status = db.origin_visit_status_get_latest(
+ visit["origin"], visit["visit"]
+ )
+ return self._origin_visit_merge(visit, visit_status)
+
+ def _origin_visit_merge(
+ self, visit: Dict[str, Any], visit_status: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Merge origin_visit and origin_visit_status together.
+
+ """
+ return OriginVisit.from_dict(
+ {
+ # default to the values in visit
+ **visit,
+ # override with the last update
+ **visit_status,
+ # visit['origin'] is the URL (via a join), while
+ # visit_status['origin'] is only an id.
+ "origin": visit["origin"],
+ # but keep the date of the creation of the origin visit
+ "date": visit["date"],
+ }
+ ).to_dict()
@timed
@db_transaction()
@@ -915,7 +1001,18 @@
for visit in visits:
# TODO: upsert them all in a single query
+ assert visit.visit is not None
db.origin_visit_upsert(visit, cur=cur)
+ with convert_validation_exceptions():
+ visit_status = OriginVisitStatus(
+ origin=visit.origin,
+ visit=visit.visit,
+ date=now(),
+ status=visit.status,
+ snapshot=visit.snapshot,
+ metadata=visit.metadata,
+ )
+ db.origin_visit_status_add(visit_status, cur=cur)
@timed
@db_transaction_generator(statement_timeout=500)
@@ -927,20 +1024,21 @@
db=None,
cur=None,
) -> Iterable[Dict[str, Any]]:
- for line in db.origin_visit_get_all(
+ lines = db.origin_visit_get_all(
origin, last_visit=last_visit, limit=limit, cur=cur
- ):
- data = dict(zip(db.origin_visit_get_cols, line))
- yield data
+ )
+ for line in lines:
+ visit = dict(zip(db.origin_visit_get_cols, line))
+ yield self._origin_visit_apply_update(visit, db)
@timed
@db_transaction(statement_timeout=500)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- line = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
- if line:
- return dict(zip(db.origin_visit_get_cols, line))
+ visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
+ if visit:
+ return self._origin_visit_apply_update(visit, db)
return None
@timed
@@ -948,11 +1046,11 @@
def origin_visit_get_by(
self, origin: str, visit: int, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- ori_visit = db.origin_visit_get(origin, visit, cur)
- if not ori_visit:
- return None
-
- return dict(zip(db.origin_visit_get_cols, ori_visit))
+ row = db.origin_visit_get(origin, visit, cur)
+ if row:
+ visit_dict = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit_dict, db)
+ return None
@timed
@db_transaction(statement_timeout=4000)
@@ -964,14 +1062,15 @@
db=None,
cur=None,
) -> Optional[Dict[str, Any]]:
- origin_visit = db.origin_visit_get_latest(
+ row = db.origin_visit_get_latest(
origin,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
cur=cur,
)
- if origin_visit:
- return dict(zip(db.origin_visit_get_cols, origin_visit))
+ if row:
+ visit = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit, db)
return None
@timed
@@ -979,11 +1078,11 @@
def origin_visit_get_random(
self, type: str, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- result = db.origin_visit_get_random(type, cur)
- if result:
- return dict(zip(db.origin_visit_get_cols, result))
- else:
- return None
+ row = db.origin_visit_get_random(type, cur)
+ if row:
+ visit = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit, db)
+ return None
@timed
@db_transaction(statement_timeout=2000)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 10:32 AM (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220000
Attached To
D2938: pg-storage: Adapt internal implementations to use origin visit status model representation
Event Timeline
Log In to Comment