Page MenuHomeSoftware Heritage

D2938.id10934.diff
No OneTemporary

D2938.id10934.diff

diff --git a/sql/upgrades/147.sql b/sql/upgrades/147.sql
new file mode 100644
--- /dev/null
+++ b/sql/upgrades/147.sql
@@ -0,0 +1,64 @@
+-- SWH DB schema upgrade
+-- from_version: 146
+-- to_version: 147
+-- description: Add origin_visit_status table
+-- 1. Rename enum origin_visit_status to origin_visit_state
+-- 2. Add new origin_visit_status table
+-- 3. Migrate origin_visit data to new origin_visit_status data
+
+-- latest schema version
+insert into dbversion(version, release, description)
+ values(147, now(), 'Work In Progress');
+
+-- schema change
+
+-- Rename old enum
+alter type origin_visit_status rename to origin_visit_state;
+comment on type origin_visit_state IS 'Possible visit status';
+
+-- Update origin visit comment on deprecated columns
+comment on column origin_visit.status is '(Deprecated) Visit status';
+comment on column origin_visit.metadata is '(Deprecated) Optional origin visit metadata';
+comment on column origin_visit.snapshot is '(Deprecated) Optional snapshot of the origin visit. It can be partial.';
+
+
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit status updates
+create table origin_visit_status
+(
+ origin bigint not null,
+ visit bigint not null,
+ date timestamptz not null,
+ status origin_visit_state not null,
+ metadata jsonb,
+ snapshot sha1_git
+);
+
+comment on column origin_visit_status.origin is 'Origin concerned by the visit update';
+comment on column origin_visit_status.visit is 'Visit concerned by the visit update';
+comment on column origin_visit_status.date is 'Visit update timestamp';
+comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)';
+comment on column origin_visit_status.metadata is 'Optional origin visit metadata';
+comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit.';
+
+
+-- origin_visit_status
+
+create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date);
+alter table origin_visit_status add primary key using index origin_visit_status_pkey;
+
+alter table origin_visit_status
+ add constraint origin_visit_status_origin_visit_fkey
+ foreign key (origin, visit)
+ references origin_visit(origin, visit) not valid;
+alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey;
+
+
+-- data change
+
+-- best approximation of the visit update date is the origin_visit's date
+insert into origin_visit_status (origin, visit, date, status, metadata, snapshot)
+select origin, visit, date, status, metadata, snapshot
+from origin_visit
+on conflict origin_visit_status(origin, visit, date)
+do nothing;
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -6,10 +6,12 @@
import random
import select
+from typing import Any, Dict, Optional, Tuple
+
from swh.core.db import BaseDb
from swh.core.db.db_utils import stored_procedure, jsonize
from swh.core.db.db_utils import execute_values_generator
-from swh.model.model import OriginVisit, SHA1_SIZE
+from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE
class Db(BaseDb):
@@ -258,9 +260,13 @@
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None):
cur = self._cursor(cur)
query = """\
- SELECT snapshot FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url=%s AND origin_visit.visit=%s;
+ SELECT ovs.snapshot
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ WHERE o.url=%s AND ov.visit=%s
+ ORDER BY ovs.date DESC LIMIT 1
"""
cur.execute(query, (origin_url, visit_id))
@@ -440,30 +446,33 @@
)
return cur.fetchone()[0]
- def origin_visit_update(self, origin_id, visit_id, updates, cur=None):
- """Update origin_visit's status."""
+ origin_visit_status_cols = [
+ "origin",
+ "visit",
+ "date",
+ "status",
+ "snapshot",
+ "metadata",
+ ]
+
+ def origin_visit_status_add(
+ self, visit_status: OriginVisitStatus, cur=None
+ ) -> None:
+ assert self.origin_visit_status_cols[0] == "origin"
+ assert self.origin_visit_status_cols[-1] == "metadata"
+ cols = self.origin_visit_status_cols[1:-1]
cur = self._cursor(cur)
- update_cols = []
- values = []
- where = ["origin.id = origin_visit.origin", "origin.url=%s", "visit=%s"]
- where_values = [origin_id, visit_id]
- if "status" in updates:
- update_cols.append("status=%s")
- values.append(updates.pop("status"))
- if "metadata" in updates:
- update_cols.append("metadata=%s")
- values.append(jsonize(updates.pop("metadata")))
- if "snapshot" in updates:
- update_cols.append("snapshot=%s")
- values.append(updates.pop("snapshot"))
- assert not updates, "Unknown fields: %r" % updates
- query = """UPDATE origin_visit
- SET {update_cols}
- FROM origin
- WHERE {where}""".format(
- **{"update_cols": ", ".join(update_cols), "where": " AND ".join(where)}
+ cur.execute(
+ f"WITH origin_id as (select id from origin where url=%s) "
+ f"INSERT INTO origin_visit_status "
+ f"(origin, {', '.join(cols)}, metadata) "
+ f"VALUES ((select id from origin_id), "
+ f"{', '.join(['%s']*len(cols))}, %s) "
+ f"ON CONFLICT (origin, visit, date) do nothing",
+ [visit_status.origin]
+ + [getattr(visit_status, key) for key in cols]
+ + [jsonize(visit_status.metadata)],
)
- cur.execute(query, (*values, *where_values))
def origin_visit_upsert(self, origin_visit: OriginVisit, cur=None) -> None:
# doing an extra query like this is way simpler than trying to join
@@ -504,15 +513,42 @@
"snapshot",
]
origin_visit_select_cols = [
- "origin.url AS origin",
- "visit",
- "date",
- "origin_visit.type AS type",
- "status",
- "metadata",
- "snapshot",
+ "o.url AS origin",
+ "ov.visit",
+ "ov.date",
+ "ov.type AS type",
+ "ovs.status",
+ "ovs.metadata",
+ "ovs.snapshot",
]
+ def _make_origin_visit_status(self, row: Tuple[Any]) -> Optional[Dict[str, Any]]:
+ """Make an origin_visit_status dict out of a row
+
+ """
+ if not row:
+ return None
+ return dict(zip(self.origin_visit_status_cols, row))
+
+ def origin_visit_status_get_latest(
+ self, origin: str, visit: int, cur=None
+ ) -> Optional[Dict[str, Any]]:
+ """Given an origin visit id, return its latest origin_visit_status
+
+ """
+ cols = self.origin_visit_status_cols
+ cur = self._cursor(cur)
+ cur.execute(
+ f"SELECT {', '.join(cols)} "
+ f"FROM origin_visit_status ovs "
+ f"INNER JOIN origin o on o.id=ovs.origin "
+ f"WHERE o.url=%s AND ovs.visit=%s"
+ f"ORDER BY ovs.date DESC LIMIT 1",
+ (origin, visit),
+ )
+ row = cur.fetchone()
+ return self._make_origin_visit_status(row)
+
def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None):
"""Retrieve all visits for origin with id origin_id.
@@ -520,25 +556,27 @@
origin_id: The occurrence's origin
Yields:
- The occurrence's history visits
+ The visits for that origin
"""
cur = self._cursor(cur)
if last_visit:
- extra_condition = "and visit > %s"
+ extra_condition = "and ov.visit > %s"
args = (origin_id, last_visit, limit)
else:
extra_condition = ""
args = (origin_id, limit)
query = """\
- SELECT %s
- FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url=%%s %s
- order by visit asc
- limit %%s""" % (
+ SELECT DISTINCT ON (ov.visit) %s
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ WHERE o.url=%%s %s
+ ORDER BY ov.visit ASC, ovs.date DESC
+ LIMIT %%s""" % (
", ".join(self.origin_visit_select_cols),
extra_condition,
)
@@ -562,9 +600,13 @@
query = """\
SELECT %s
- FROM origin_visit
- INNER JOIN origin ON origin.id = origin_visit.origin
- WHERE origin.url = %%s AND visit = %%s
+ FROM origin_visit ov
+ INNER JOIN origin o ON o.id = ov.origin
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ WHERE o.url = %%s AND ov.visit = %%s
+ ORDER BY ovs.date DESC
+ LIMIT 1
""" % (
", ".join(self.origin_visit_select_cols)
)
@@ -580,9 +622,11 @@
cur.execute(
"SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date)
)
- r = cur.fetchall()
- if r:
- return r[0]
+ rows = cur.fetchall()
+ if rows:
+ visit = dict(zip(self.origin_visit_get_cols, rows[0]))
+ visit["origin"] = origin
+ return visit
def origin_visit_exists(self, origin_id, visit_id, cur=None):
"""Check whether an origin visit with the given ids exists"""
@@ -613,21 +657,25 @@
query_parts = [
"SELECT %s" % ", ".join(self.origin_visit_select_cols),
- "FROM origin_visit",
- "INNER JOIN origin ON origin.id = origin_visit.origin",
+ "FROM origin_visit ov ",
+ "INNER JOIN origin o ON o.id = ov.origin",
+ "INNER JOIN origin_visit_status ovs ",
+ "ON o.id = ovs.origin AND ov.visit = ovs.visit ",
]
- query_parts.append("WHERE origin.url = %s")
+ query_parts.append("WHERE o.url = %s")
if require_snapshot:
- query_parts.append("AND snapshot is not null")
+ query_parts.append("AND ovs.snapshot is not null")
if allowed_statuses:
query_parts.append(
- cur.mogrify("AND status IN %s", (tuple(allowed_statuses),)).decode()
+ cur.mogrify("AND ovs.status IN %s", (tuple(allowed_statuses),)).decode()
)
- query_parts.append("ORDER BY date DESC, visit DESC LIMIT 1")
+ query_parts.append(
+ "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1"
+ )
query = "\n".join(query_parts)
@@ -644,18 +692,15 @@
"""
cur = self._cursor(cur)
columns = ",".join(self.origin_visit_select_cols)
- query = f"""with visits as (
- select *
- from origin_visit
- where origin_visit.status='full' and
- origin_visit.type=%s and
- origin_visit.date > now() - '3 months'::interval
- )
- select {columns}
- from visits as origin_visit
- inner join origin
- on origin_visit.origin=origin.id
- where random() < 0.1
+ query = f"""select {columns}
+ from origin_visit ov
+ inner join origin o on ov.origin=o.id
+ inner join origin_visit_status ovs
+ on ov.origin = ovs.origin and ov.visit = ovs.visit
+ where ovs.status='full'
+ and ov.type=%s
+ and ov.date > now() - '3 months'::interval
+ and random() < 0.1
limit 1
"""
cur.execute(query, (type,))
@@ -889,15 +934,17 @@
origin_cols = ",".join(self.origin_cols)
query = """SELECT %s
- FROM origin
+ FROM origin o
WHERE """
if with_visit:
query += """
EXISTS (
SELECT 1
- FROM origin_visit
- INNER JOIN snapshot ON snapshot=snapshot.id
- WHERE origin=origin.id
+ FROM origin_visit ov
+ INNER JOIN origin_visit_status ovs
+ ON ov.origin = ovs.origin AND ov.visit = ovs.visit
+ INNER JOIN snapshot ON ovs.snapshot=snapshot.id
+ WHERE ov.origin=o.id
)
AND """
query += "url %s %%s "
diff --git a/swh/storage/sql/20-swh-enums.sql b/swh/storage/sql/20-swh-enums.sql
--- a/swh/storage/sql/20-swh-enums.sql
+++ b/swh/storage/sql/20-swh-enums.sql
@@ -14,9 +14,9 @@
create type snapshot_target as enum ('content', 'directory', 'revision', 'release', 'snapshot', 'alias');
comment on type snapshot_target is 'Types of targets for snapshot branches';
-create type origin_visit_status as enum (
+create type origin_visit_state as enum (
'ongoing',
'full',
'partial'
);
-comment on type origin_visit_status IS 'Possible visit status';
+comment on type origin_visit_state IS 'Possible visit status';
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -17,7 +17,7 @@
-- latest schema version
insert into dbversion(version, release, description)
- values(146, now(), 'Work In Progress');
+ values(147, now(), 'Work In Progress');
-- a SHA1 checksum
create domain sha1 as bytea check (length(value) = 20);
@@ -282,7 +282,8 @@
visit bigint not null,
date timestamptz not null,
type text not null,
- status origin_visit_status not null,
+ -- remove those when done migrating the schema
+ status origin_visit_state not null,
metadata jsonb,
snapshot sha1_git
);
@@ -291,9 +292,29 @@
comment on column origin_visit.visit is 'Sequential visit number for the origin';
comment on column origin_visit.date is 'Visit timestamp';
comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)';
-comment on column origin_visit.status is 'Visit result';
-comment on column origin_visit.metadata is 'Origin metadata at visit time';
-comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
+comment on column origin_visit.status is '(Deprecated) Visit status';
+comment on column origin_visit.metadata is '(Deprecated) Optional origin visit metadata';
+comment on column origin_visit.snapshot is '(Deprecated) Optional snapshot of the origin visit. It can be partial.';
+
+
+-- Crawling history of software origin visits by Software Heritage. Each
+-- visit see its history change through new origin visit status updates
+create table origin_visit_status
+(
+ origin bigint not null,
+ visit bigint not null,
+ date timestamptz not null,
+ status origin_visit_state not null,
+ metadata jsonb,
+ snapshot sha1_git
+);
+
+comment on column origin_visit_status.origin is 'Origin concerned by the visit update';
+comment on column origin_visit_status.visit is 'Visit concerned by the visit update';
+comment on column origin_visit_status.date is 'Visit update timestamp';
+comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)';
+comment on column origin_visit_status.metadata is 'Optional origin visit metadata';
+comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.';
-- A snapshot represents the entire state of a software origin as crawled by
diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql
--- a/swh/storage/sql/60-swh-indexes.sql
+++ b/swh/storage/sql/60-swh-indexes.sql
@@ -130,6 +130,17 @@
alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid;
alter table origin_visit validate constraint origin_visit_origin_fkey;
+-- origin_visit_status
+
+create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date);
+alter table origin_visit_status add primary key using index origin_visit_status_pkey;
+
+alter table origin_visit_status
+ add constraint origin_visit_status_origin_visit_fkey
+ foreign key (origin, visit)
+ references origin_visit(origin, visit) not valid;
+alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey;
+
-- release
create unique index concurrently release_pkey on release(id);
alter table release add primary key using index release_pkey;
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -23,6 +23,7 @@
Directory,
Origin,
OriginVisit,
+ OriginVisitStatus,
Revision,
Release,
SkippedContent,
@@ -843,6 +844,7 @@
with convert_validation_exceptions():
visit_id = db.origin_visit_add(origin_url, date, type, cur=cur)
+ status = "ongoing"
# We can write to the journal only after inserting to the
# DB, because we want the id of the visit
visit = OriginVisit.from_dict(
@@ -851,16 +853,42 @@
"date": date,
"type": type,
"visit": visit_id,
- "status": "ongoing",
+ # TODO: Remove when we remove those fields from the model
+ "status": status,
"metadata": None,
"snapshot": None,
}
)
+
+ with convert_validation_exceptions():
+ visit_status = OriginVisitStatus(
+ origin=origin_url,
+ visit=visit_id,
+ date=date,
+ status=status,
+ snapshot=None,
+ metadata=None,
+ )
+ self._origin_visit_status_add(visit_status, db=db, cur=cur)
+
self.journal_writer.origin_visit_add(visit)
send_metric("origin_visit:add", count=1, method_name="origin_visit")
return visit
+ def _origin_visit_status_add(
+ self, origin_visit_status: OriginVisitStatus, db, cur
+ ) -> None:
+ """Add an origin visit update"""
+ # Inject origin visit update in the schema
+ db.origin_visit_status_add(origin_visit_status, cur=cur)
+
+ # write to the journal the origin visit update
+
+ send_metric(
+ "origin_visit_status:add", count=1, method_name="origin_visit_status"
+ )
+
@timed
@db_transaction()
def origin_visit_update(
@@ -899,8 +927,66 @@
updated_visit = OriginVisit.from_dict({**visit, **updates})
self.journal_writer.origin_visit_update(updated_visit)
+ last_visit_status = self._origin_visit_get_updated(
+ origin, visit_id, db=db, cur=cur
+ )
+ assert last_visit_status is not None
+
with convert_validation_exceptions():
- db.origin_visit_update(origin_url, visit_id, updates, cur)
+ visit_status = OriginVisitStatus(
+ origin=origin_url,
+ visit=visit_id,
+ date=date or now(),
+ status=status,
+ snapshot=snapshot or last_visit_status["snapshot"],
+ metadata=metadata or last_visit_status["metadata"],
+ )
+ self._origin_visit_status_add(visit_status, db=db, cur=cur)
+
+ def _origin_visit_get_updated(
+ self, origin: str, visit_id: int, db, cur
+ ) -> Optional[Dict[str, Any]]:
+ """Retrieve origin visit and latest origin visit update and merge them
+ into an origin visit.
+
+ """
+ row_visit = db.origin_visit_get(origin, visit_id)
+ if row_visit is None:
+ return None
+ visit = dict(zip(db.origin_visit_get_cols, row_visit))
+ return self._origin_visit_apply_update(visit, db=db, cur=cur)
+
+ def _origin_visit_apply_update(
+ self, visit: Dict[str, Any], db, cur=None
+ ) -> Dict[str, Any]:
+ """Retrieve the latest visit update information for the origin visit.
+ Then merge it with the visit and return it.
+
+ """
+ visit_status = db.origin_visit_status_get_latest(
+ visit["origin"], visit["visit"]
+ )
+ return self._origin_visit_merge(visit, visit_status)
+
+ def _origin_visit_merge(
+ self, visit: Dict[str, Any], visit_status: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Merge origin_visit and origin_visit_status together.
+
+ """
+ return OriginVisit.from_dict(
+ {
+ # default to the values in visit
+ **visit,
+ # override with the last update
+ **visit_status,
+ # visit['origin'] is the URL (via a join), while
+ # visit_status['origin'] is only an id.
+ "origin": visit["origin"],
+ # but keep the date of the creation of the origin visit
+ "date": visit["date"],
+ }
+ ).to_dict()
@timed
@db_transaction()
@@ -915,7 +1001,18 @@
for visit in visits:
# TODO: upsert them all in a single query
+ assert visit.visit is not None
db.origin_visit_upsert(visit, cur=cur)
+ with convert_validation_exceptions():
+ visit_status = OriginVisitStatus(
+ origin=visit.origin,
+ visit=visit.visit,
+ date=now(),
+ status=visit.status,
+ snapshot=visit.snapshot,
+ metadata=visit.metadata,
+ )
+ db.origin_visit_status_add(visit_status, cur=cur)
@timed
@db_transaction_generator(statement_timeout=500)
@@ -927,20 +1024,21 @@
db=None,
cur=None,
) -> Iterable[Dict[str, Any]]:
- for line in db.origin_visit_get_all(
+ lines = db.origin_visit_get_all(
origin, last_visit=last_visit, limit=limit, cur=cur
- ):
- data = dict(zip(db.origin_visit_get_cols, line))
- yield data
+ )
+ for line in lines:
+ visit = dict(zip(db.origin_visit_get_cols, line))
+ yield self._origin_visit_apply_update(visit, db)
@timed
@db_transaction(statement_timeout=500)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- line = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
- if line:
- return dict(zip(db.origin_visit_get_cols, line))
+ visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
+ if visit:
+ return self._origin_visit_apply_update(visit, db)
return None
@timed
@@ -948,11 +1046,11 @@
def origin_visit_get_by(
self, origin: str, visit: int, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- ori_visit = db.origin_visit_get(origin, visit, cur)
- if not ori_visit:
- return None
-
- return dict(zip(db.origin_visit_get_cols, ori_visit))
+ row = db.origin_visit_get(origin, visit, cur)
+ if row:
+ visit_dict = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit_dict, db)
+ return None
@timed
@db_transaction(statement_timeout=4000)
@@ -964,14 +1062,15 @@
db=None,
cur=None,
) -> Optional[Dict[str, Any]]:
- origin_visit = db.origin_visit_get_latest(
+ row = db.origin_visit_get_latest(
origin,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
cur=cur,
)
- if origin_visit:
- return dict(zip(db.origin_visit_get_cols, origin_visit))
+ if row:
+ visit = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit, db)
return None
@timed
@@ -979,11 +1078,11 @@
def origin_visit_get_random(
self, type: str, db=None, cur=None
) -> Optional[Dict[str, Any]]:
- result = db.origin_visit_get_random(type, cur)
- if result:
- return dict(zip(db.origin_visit_get_cols, result))
- else:
- return None
+ row = db.origin_visit_get_random(type, cur)
+ if row:
+ visit = dict(zip(db.origin_visit_get_cols, row))
+ return self._origin_visit_apply_update(visit, db)
+ return None
@timed
@db_transaction(statement_timeout=2000)

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 10:32 AM (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220000

Event Timeline