Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348314
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
52 KB
Subscribers
None
View Options
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
index fc926c4..3c06d7e 100644
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -1,97 +1,97 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .archive import ArchiveInterface
from .interface import ProvenanceInterface, ProvenanceStorageInterface
def get_archive(cls: str, **kwargs) -> ArchiveInterface:
"""Get an archive object of class ``cls`` with arguments ``args``.
Args:
cls: archive's class, either 'api' or 'direct'
args: dictionary of arguments passed to the archive class constructor
Returns:
an instance of archive object (either using swh.storage API or direct
queries to the archive's database)
Raises:
:cls:`ValueError` if passed an unknown archive class.
"""
if cls == "api":
from swh.storage import get_storage
from .storage.archive import ArchiveStorage
return ArchiveStorage(get_storage(**kwargs["storage"]))
elif cls == "direct":
from swh.core.db import BaseDb
from .postgresql.archive import ArchivePostgreSQL
return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn)
else:
raise ValueError
def get_provenance(**kwargs) -> ProvenanceInterface:
"""Get an provenance object with arguments ``args``.
Args:
args: dictionary of arguments to retrieve a swh.provenance.storage
class (see :func:`get_provenance_storage` for details)
Returns:
an instance of provenance object
"""
from .provenance import Provenance
return Provenance(get_provenance_storage(**kwargs))
def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface:
"""Get an archive object of class ``cls`` with arguments ``args``.
Args:
cls: storage's class, only 'local' is currently supported
args: dictionary of arguments passed to the storage class constructor
Returns:
an instance of storage object
Raises:
:cls:`ValueError` if passed an unknown archive class.
"""
if cls == "local":
from swh.core.db import BaseDb
from .postgresql.provenancedb_base import ProvenanceDBBase
conn = BaseDb.connect(**kwargs["db"]).conn
raise_on_commit = kwargs.get("raise_on_commit", False)
- if ProvenanceDBBase(conn, raise_on_commit).flavor == "with-path":
+ if "with-path" in ProvenanceDBBase(conn, raise_on_commit).flavor:
from .postgresql.provenancedb_with_path import ProvenanceWithPathDB
return ProvenanceWithPathDB(conn, raise_on_commit)
else:
from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
return ProvenanceWithoutPathDB(conn, raise_on_commit)
elif cls == "remote":
from .api.client import RemoteProvenanceStorage
storage = RemoteProvenanceStorage(**kwargs)
assert isinstance(storage, ProvenanceStorageInterface)
return storage
else:
raise ValueError
diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py
index 6e1b6f3..42d6490 100644
--- a/swh/provenance/postgresql/provenancedb_base.py
+++ b/swh/provenance/postgresql/provenancedb_base.py
@@ -1,319 +1,357 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import itertools
import logging
from typing import Dict, Generator, Iterable, Optional, Set, Tuple
import psycopg2
import psycopg2.extras
from typing_extensions import Literal
from swh.core.db import BaseDb
from swh.model.model import Sha1Git
from ..interface import (
EntityType,
ProvenanceResult,
RelationData,
RelationType,
RevisionData,
)
class ProvenanceDBBase:
def __init__(
self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False
):
BaseDb.adapt_conn(conn)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
conn.set_session(autocommit=True)
self.conn = conn
self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# XXX: not sure this is the best place to do it!
sql = "SET timezone TO 'UTC'"
self.cursor.execute(sql)
self._flavor: Optional[str] = None
self.raise_on_commit = raise_on_commit
@property
def flavor(self) -> str:
if self._flavor is None:
sql = "SELECT swh_get_dbflavor() AS flavor"
self.cursor.execute(sql)
self._flavor = self.cursor.fetchone()["flavor"]
assert self._flavor is not None
return self._flavor
def with_path(self) -> bool:
- return self.flavor == "with-path"
+ return "with-path" in self.flavor
+
+ @property
+ def denormalized(self) -> bool:
+ return "denormalized" in self.flavor
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
...
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
...
def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
return self._entity_set_date("content", dates)
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return self._entity_get_date("content", ids)
def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
return self._entity_set_date("directory", dates)
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return self._entity_get_date("directory", ids)
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
sql = f"SELECT sha1 FROM {entity.value}"
self.cursor.execute(sql)
return {row["sha1"] for row in self.cursor.fetchall()}
def location_get(self) -> Set[bytes]:
sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location"
self.cursor.execute(sql)
return {row["path"] for row in self.cursor.fetchall()}
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool:
try:
if urls:
sql = """
LOCK TABLE ONLY origin;
INSERT INTO origin(sha1, url) VALUES %s
ON CONFLICT DO NOTHING
"""
psycopg2.extras.execute_values(self.cursor, sql, urls.items())
return True
except: # noqa: E722
# Unexpected error occurred, rollback all changes and log message
logging.exception("Unexpected error")
if self.raise_on_commit:
raise
return False
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
urls: Dict[Sha1Git, str] = {}
sha1s = tuple(ids)
if sha1s:
values = ", ".join(itertools.repeat("%s", len(sha1s)))
sql = f"""
SELECT sha1, url
FROM origin
WHERE sha1 IN ({values})
"""
self.cursor.execute(sql, sha1s)
urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall())
return urls
def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
return self._entity_set_date("revision", dates)
def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool:
try:
if origins:
sql = """
LOCK TABLE ONLY revision;
INSERT INTO revision(sha1, origin)
(SELECT V.rev AS sha1, O.id AS origin
FROM (VALUES %s) AS V(rev, org)
JOIN origin AS O ON (O.sha1=V.org))
ON CONFLICT (sha1) DO
UPDATE SET origin=EXCLUDED.origin
"""
psycopg2.extras.execute_values(self.cursor, sql, origins.items())
return True
except: # noqa: E722
# Unexpected error occurred, rollback all changes and log message
logging.exception("Unexpected error")
if self.raise_on_commit:
raise
return False
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
result: Dict[Sha1Git, RevisionData] = {}
sha1s = tuple(ids)
if sha1s:
values = ", ".join(itertools.repeat("%s", len(sha1s)))
sql = f"""
SELECT sha1, date, origin
FROM revision
WHERE sha1 IN ({values})
"""
self.cursor.execute(sql, sha1s)
result.update(
(row["sha1"], RevisionData(date=row["date"], origin=row["origin"]))
for row in self.cursor.fetchall()
)
return result
def relation_add(
self, relation: RelationType, data: Iterable[RelationData]
) -> bool:
try:
rows = tuple((rel.src, rel.dst, rel.path) for rel in data)
if rows:
table = relation.value
src, *_, dst = table.split("_")
if src != "origin":
# Origin entries should be inserted previously as they require extra
# non-null information
srcs = tuple(set((sha1,) for (sha1, _, _) in rows))
sql = f"""
LOCK TABLE ONLY {src};
INSERT INTO {src}(sha1) VALUES %s
ON CONFLICT DO NOTHING
"""
psycopg2.extras.execute_values(self.cursor, sql, srcs)
if dst != "origin":
# Origin entries should be inserted previously as they require extra
# non-null information
dsts = tuple(set((sha1,) for (_, sha1, _) in rows))
sql = f"""
LOCK TABLE ONLY {dst};
INSERT INTO {dst}(sha1) VALUES %s
ON CONFLICT DO NOTHING
"""
psycopg2.extras.execute_values(self.cursor, sql, dsts)
joins = [
f"INNER JOIN {src} AS S ON (S.sha1=V.src)",
f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)",
]
- selected = ["S.id", "D.id"]
+ nope = (RelationType.REV_BEFORE_REV, RelationType.REV_IN_ORG)
+ selected = ["S.id"]
+ if self.denormalized and relation not in nope:
+ selected.append("ARRAY_AGG(D.id)")
+ else:
+ selected.append("D.id")
if self._relation_uses_location_table(relation):
locations = tuple(set((path,) for (_, _, path) in rows))
sql = """
LOCK TABLE ONLY location;
INSERT INTO location(path) VALUES %s
ON CONFLICT (path) DO NOTHING
"""
psycopg2.extras.execute_values(self.cursor, sql, locations)
joins.append("INNER JOIN location AS L ON (L.path=V.path)")
- selected.append("L.id")
+ if self.denormalized:
+ selected.append("ARRAY_AGG(L.id)")
+ else:
+ selected.append("L.id")
+ sql_l = [
+ f"INSERT INTO {table}",
+ f" SELECT {', '.join(selected)}",
+ " FROM (VALUES %s) AS V(src, dst, path)",
+ *joins,
+ ]
- sql = f"""
- INSERT INTO {table}
- (SELECT {", ".join(selected)}
- FROM (VALUES %s) AS V(src, dst, path)
- {'''
- '''.join(joins)})
- ON CONFLICT DO NOTHING
+ if self.denormalized and relation not in nope:
+ sql_l.append("GROUP BY S.id")
+ sql_l.append(
+ f"""ON CONFLICT ({src}) DO UPDATE
+ SET {dst}=ARRAY(
+ SELECT UNNEST({table}.{dst} || excluded.{dst})),
+ location=ARRAY(
+ SELECT UNNEST({relation.value}.location || excluded.location))
"""
+ )
+ else:
+ sql_l.append("ON CONFLICT DO NOTHING")
+ sql = "\n".join(sql_l)
psycopg2.extras.execute_values(self.cursor, sql, rows)
return True
except: # noqa: E722
# Unexpected error occurred, rollback all changes and log message
logging.exception("Unexpected error")
if self.raise_on_commit:
raise
return False
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Set[RelationData]:
return self._relation_get(relation, ids, reverse)
def relation_get_all(self, relation: RelationType) -> Set[RelationData]:
return self._relation_get(relation, None)
def _entity_get_date(
self,
entity: Literal["content", "directory", "revision"],
ids: Iterable[Sha1Git],
) -> Dict[Sha1Git, datetime]:
dates: Dict[Sha1Git, datetime] = {}
sha1s = tuple(ids)
if sha1s:
values = ", ".join(itertools.repeat("%s", len(sha1s)))
sql = f"""
SELECT sha1, date
FROM {entity}
WHERE sha1 IN ({values})
"""
self.cursor.execute(sql, sha1s)
dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall())
return dates
def _entity_set_date(
self,
entity: Literal["content", "directory", "revision"],
data: Dict[Sha1Git, datetime],
) -> bool:
try:
if data:
sql = f"""
LOCK TABLE ONLY {entity};
INSERT INTO {entity}(sha1, date) VALUES %s
ON CONFLICT (sha1) DO
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date)
"""
psycopg2.extras.execute_values(self.cursor, sql, data.items())
return True
except: # noqa: E722
# Unexpected error occurred, rollback all changes and log message
logging.exception("Unexpected error")
if self.raise_on_commit:
raise
return False
def _relation_get(
self,
relation: RelationType,
ids: Optional[Iterable[Sha1Git]],
reverse: bool = False,
) -> Set[RelationData]:
result: Set[RelationData] = set()
sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]]
if ids is not None:
sha1s = (tuple(ids),)
- where = f"WHERE {'S.sha1' if not reverse else 'D.sha1'} IN %s"
+ where = f"WHERE {'S' if not reverse else 'D'}.sha1 IN %s"
else:
sha1s = None
where = ""
+ aggreg_dst = self.denormalized and relation in (
+ RelationType.CNT_EARLY_IN_REV,
+ RelationType.CNT_IN_DIR,
+ RelationType.DIR_IN_REV,
+ )
if sha1s is None or sha1s[0]:
table = relation.value
src, *_, dst = table.split("_")
# TODO: improve this!
if src == "revision" and dst == "revision":
src_field = "prev"
dst_field = "next"
else:
src_field = src
dst_field = dst
- joins = [
- f"INNER JOIN {src} AS S ON (S.id=R.{src_field})",
- f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})",
- ]
- selected = ["S.sha1 AS src", "D.sha1 AS dst"]
+ if aggreg_dst:
+ revloc = f"UNNEST(R.{dst_field}) AS dst"
+ if self._relation_uses_location_table(relation):
+ revloc += ", UNNEST(R.location) AS path"
+ else:
+ revloc = f"R.{dst_field} AS dst"
+ if self._relation_uses_location_table(relation):
+ revloc += ", R.location AS path"
+
+ inner_sql = f"""
+ SELECT S.sha1 AS src, {revloc}
+ FROM {table} AS R
+ INNER JOIN {src} AS S ON (S.id=R.{src_field})
+ {where}
+ """
if self._relation_uses_location_table(relation):
- joins.append("INNER JOIN location AS L ON (L.id=R.location)")
- selected.append("L.path AS path")
+ loc = "L.path AS path"
else:
- selected.append("NULL AS path")
-
+ loc = "NULL AS path"
sql = f"""
- SELECT {", ".join(selected)}
- FROM {table} AS R
- {" ".join(joins)}
- {where}
- """
+ SELECT CL.src, D.sha1 AS dst, {loc}
+ FROM ({inner_sql}) AS CL
+ INNER JOIN {dst} AS D ON (D.id=CL.dst)
+ """
+ if self._relation_uses_location_table(relation):
+ sql += "INNER JOIN location AS L ON (L.id=CL.path)"
+
self.cursor.execute(sql, sha1s)
result.update(RelationData(**row) for row in self.cursor.fetchall())
return result
def _relation_uses_location_table(self, relation: RelationType) -> bool:
...
diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py
index 149458d..6ec25a1 100644
--- a/swh/provenance/postgresql/provenancedb_with_path.py
+++ b/swh/provenance/postgresql/provenancedb_with_path.py
@@ -1,75 +1,149 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Generator, Optional
from swh.model.model import Sha1Git
from ..interface import ProvenanceResult, RelationType
from .provenancedb_base import ProvenanceDBBase
class ProvenanceWithPathDB(ProvenanceDBBase):
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- sql = """
+ if self.denormalized:
+ sql = """
+ SELECT C_L.sha1 AS content,
+ R.sha1 AS revision,
+ R.date AS date,
+ O.url AS origin,
+ L.path AS path
+ FROM (
+ sELECT C.sha1 AS sha1,
+ UNNEST(revision) AS revision,
+ UNNEST(location) AS location
+ FROM content_in_revision AS C_R
+ INNER JOIN content AS C ON (C.id=C_R.content)
+ WHERE C.sha1=%s) AS C_L
+ INNER JOIN revision AS R ON (R.id=C_L.revision)
+ INNER JOIN location AS L ON (L.id=C_L.location)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
+ ORDER BY date, revision, origin, path ASC LIMIT 1
+ """
+ else:
+ sql = """
SELECT C.sha1 AS content,
R.sha1 AS revision,
R.date AS date,
O.url AS origin,
L.path AS path
FROM content AS C
INNER JOIN content_in_revision AS CR ON (CR.content=C.id)
INNER JOIN location as L ON (CR.location=L.id)
INNER JOIN revision as R ON (CR.revision=R.id)
- LEFT JOIN origin as O ON (R.origin=O.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s
ORDER BY date, revision, origin, path ASC LIMIT 1
"""
self.cursor.execute(sql, (id,))
row = self.cursor.fetchone()
return ProvenanceResult(**row) if row is not None else None
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
early_cut = f"LIMIT {limit}" if limit is not None else ""
- sql = f"""
+ if self.denormalized:
+ sql = f"""
+ (SELECT C_L.sha1 AS content,
+ R.sha1 AS revision,
+ R.date AS date,
+ O.url AS origin,
+ L.path AS path
+ FROM (
+ SELECT C.sha1 AS sha1,
+ unnest(revision) AS revision,
+ unnest(location) AS location
+ FROM content_in_revision AS C_R
+ INNER JOIN content AS C ON (C.id = C_R.content)
+ WHERE C.sha1=%s) AS C_L
+ INNER JOIN revision AS R ON (R.id = C_L.revision)
+ INNER JOIN location AS L ON (L.id = C_L.location)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
+ )
+ UNION
+ (WITH
+ C_D as (
+ SELECT C.sha1 AS content_sha1,
+ unnest(CD.directory) AS directory,
+ unnest(CD.location) AS location
+ FROM content AS C
+ INNER JOIN content_in_directory AS CD ON (CD.content = C.id)
+ WHERE C.sha1=%s
+ ),
+ D_R as (
+ SELECT C_D.content_sha1 AS content_sha1,
+ DL.path AS file_path,
+ unnest(DR.revision) AS revision,
+ unnest(DR.location) AS prefix_location
+ FROM C_D
+ INNER JOIN directory_in_revision AS DR ON (DR.directory = C_D.directory)
+ INNER JOIN location AS DL ON (DL.id = C_D.location)
+ )
+ SELECT D_R.content_sha1 AS sha1,
+ R.sha1 AS revision,
+ R.date AS date,
+ O.url AS origin,
+ CASE DL.path
+ WHEN '' THEN D_R.file_path
+ WHEN '.' THEN D_R.file_path
+ ELSE (DL.path || '/' || D_R.file_path)::unix_path
+ END AS path
+ FROM D_R
+ INNER JOIN location AS DL ON (D_R.prefix_location = DL.id)
+ INNER JOIN revision AS R ON (D_R.revision = R.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
+ )
+ ORDER BY date, revision, origin, path {early_cut}
+ """
+ else:
+ sql = f"""
(SELECT C.sha1 AS content,
R.sha1 AS revision,
R.date AS date,
O.url AS origin,
L.path AS path
FROM content AS C
INNER JOIN content_in_revision AS CR ON (CR.content=C.id)
INNER JOIN location AS L ON (CR.location=L.id)
INNER JOIN revision AS R ON (CR.revision=R.id)
- LEFT JOIN origin as O ON (R.origin=O.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s)
UNION
(SELECT C.sha1 AS content,
R.sha1 AS revision,
R.date AS date,
O.url AS origin,
CASE DL.path
WHEN '' THEN CL.path
WHEN '.' THEN CL.path
ELSE (DL.path || '/' || CL.path)::unix_path
END AS path
FROM content AS C
INNER JOIN content_in_directory AS CD ON (C.id=CD.content)
INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory)
INNER JOIN revision AS R ON (DR.revision=R.id)
INNER JOIN location AS CL ON (CD.location=CL.id)
INNER JOIN location AS DL ON (DR.location=DL.id)
LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s)
ORDER BY date, revision, origin, path {early_cut}
"""
self.cursor.execute(sql, (id, id))
yield from (ProvenanceResult(**row) for row in self.cursor.fetchall())
def _relation_uses_location_table(self, relation: RelationType) -> bool:
src, *_ = relation.value.split("_")
return src in ("content", "directory")
diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py
index 30ceda3..3d5f323 100644
--- a/swh/provenance/postgresql/provenancedb_without_path.py
+++ b/swh/provenance/postgresql/provenancedb_without_path.py
@@ -1,66 +1,127 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Generator, Optional
from swh.model.model import Sha1Git
from ..provenance import ProvenanceResult, RelationType
from .provenancedb_base import ProvenanceDBBase
class ProvenanceWithoutPathDB(ProvenanceDBBase):
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- sql = """
+ if self.denormalized:
+ sql = """
+ SELECT C_L.sha1 AS content,
+ R.sha1 AS revision,
+ R.date AS date,
+ O.url AS origin,
+ '\\x'::bytea AS path
+ FROM (
+ SELECT C.sha1, UNNEST(revision) AS revision
+ FROM content_in_revision AS C_R
+ INNER JOIN content AS C ON (C.id=C_R.content)
+ WHERE C.sha1=%s
+ ) AS C_L
+ INNER JOIN revision AS R ON (R.id=C_L.revision)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
+ ORDER BY date, revision, origin ASC LIMIT 1
+ """
+ else:
+ sql = """
SELECT C.sha1 AS content,
R.sha1 AS revision,
R.date AS date,
O.url AS origin,
- '\\x'::bytea as path
+ '\\x'::bytea AS path
FROM content AS C
- INNER JOIN content_in_revision AS CR ON (CR.content=C.id)
- INNER JOIN revision as R ON (CR.revision=R.id)
- LEFT JOIN origin as O ON (R.origin=O.id)
+ INNER JOIN content_in_revision AS CR ON (CR.content = C.id)
+ INNER JOIN revision AS R ON (CR.revision = R.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s
ORDER BY date, revision, origin ASC LIMIT 1
"""
+
self.cursor.execute(sql, (id,))
row = self.cursor.fetchone()
return ProvenanceResult(**row) if row is not None else None
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
early_cut = f"LIMIT {limit}" if limit is not None else ""
- sql = f"""
+ if self.denormalized:
+ sql = f"""
+ (SELECT C_L.sha1 AS content,
+ R.sha1 AS revision,
+ R.date AS date,
+ O.url AS origin,
+ '\\x'::bytea as path
+ FROM (
+ SELECT C.sha1, UNNEST(revision) AS revision
+ FROM content_in_revision AS C_R
+ INNER JOIN content AS C ON (C.id=C_R.content)
+ WHERE C.sha1=%s) AS C_L
+ INNER JOIN revision AS R ON (R.id=C_L.revision)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
+ )
+ UNION
+ (WITH
+ C_D AS (
+ SELECT C.sha1 AS content_sha1,
+ unnest(CD.directory) AS directory
+ FROM content AS C
+ INNER JOIN content_in_directory AS CD ON (CD.content = C.id)
+ WHERE C.sha1=%s
+ ),
+ D_R AS (
+ SELECT C_D.content_sha1 AS content_sha1,
+ UNNEST(DR.revision) AS revision
+ FROM C_D
+ INNER JOIN directory_in_revision AS DR ON (DR.directory = C_D.directory)
+ )
+ SELECT D_R.content_sha1 AS content,
+ R.sha1 AS revision,
+ R.date AS date,
+ O.url AS origin,
+ '\\x'::bytea AS path
+ FROM D_R
+ INNER JOIN revision AS R ON (D_R.revision = R.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
+ )
+ ORDER BY date, revision, path {early_cut}
+ """
+ else:
+ sql = f"""
(SELECT C.sha1 AS content,
R.sha1 AS revision,
R.date AS date,
O.url AS origin,
'\\x'::bytea as path
FROM content AS C
INNER JOIN content_in_revision AS CR ON (CR.content=C.id)
INNER JOIN revision AS R ON (CR.revision=R.id)
- LEFT JOIN origin as O ON (R.origin=O.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s)
UNION
(SELECT C.sha1 AS content,
R.sha1 AS revision,
R.date AS date,
O.url AS origin,
- '\\x'::bytea as path
+ '\\x'::bytea AS path
FROM content AS C
INNER JOIN content_in_directory AS CD ON (C.id=CD.content)
INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory)
INNER JOIN revision AS R ON (DR.revision=R.id)
- LEFT JOIN origin as O ON (R.origin=O.id)
+ LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s)
ORDER BY date, revision, origin {early_cut}
"""
self.cursor.execute(sql, (id, id))
yield from (ProvenanceResult(**row) for row in self.cursor.fetchall())
def _relation_uses_location_table(self, relation: RelationType) -> bool:
return False
diff --git a/swh/provenance/sql/15-flavor.sql b/swh/provenance/sql/15-flavor.sql
index b270d0b..6a75587 100644
--- a/swh/provenance/sql/15-flavor.sql
+++ b/swh/provenance/sql/15-flavor.sql
@@ -1,21 +1,23 @@
-- database flavor
create type database_flavor as enum (
'with-path',
- 'without-path'
+ 'without-path',
+ 'with-path-denormalized',
+ 'without-path-denormalized'
);
comment on type database_flavor is 'Flavor of the current database';
create table dbflavor (
flavor database_flavor,
single_row char(1) primary key default 'x',
check (single_row = 'x')
);
comment on table dbflavor is 'Database flavor storage';
comment on column dbflavor.flavor is 'Database flavor currently deployed';
comment on column dbflavor.single_row is 'Bogus column to force the table to have a single row';
create or replace function swh_get_dbflavor() returns database_flavor language sql stable as $$
select coalesce((select flavor from dbflavor), 'with-path');
$$;
comment on function swh_get_dbflavor is 'Get the flavor of the database currently deployed';
diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql
index 8bf09a5..976d473 100644
--- a/swh/provenance/sql/30-schema.sql
+++ b/swh/provenance/sql/30-schema.sql
@@ -1,135 +1,151 @@
-- psql variables to get the current database flavor
+select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset
create table dbversion
(
version int primary key,
release timestamptz,
description text
);
comment on table dbversion is 'Details of current db version';
comment on column dbversion.version is 'SQL schema version';
comment on column dbversion.release is 'Version deployment timestamp';
comment on column dbversion.description is 'Release description';
-- latest schema version
insert into dbversion(version, release, description)
values(1, now(), 'Work In Progress');
-- a Git object ID, i.e., a Git-style salted SHA1 checksum
create domain sha1_git as bytea check (length(value) = 20);
-- UNIX path (absolute, relative, individual path component, etc.)
create domain unix_path as bytea;
-- entity tables
create table content
(
id bigserial primary key, -- internal identifier of the content blob
sha1 sha1_git unique not null, -- intrinsic identifier of the content blob
date timestamptz -- timestamp of the revision where the blob appears early
);
comment on column content.id is 'Content internal identifier';
comment on column content.sha1 is 'Content intrinsic identifier';
comment on column content.date is 'Earliest timestamp for the content (first seen time)';
create table directory
(
id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier
sha1 sha1_git unique not null, -- intrinsic identifier of the directory
date timestamptz -- max timestamp among those of the directory children's
);
comment on column directory.id is 'Directory internal identifier';
comment on column directory.sha1 is 'Directory intrinsic identifier';
comment on column directory.date is 'Latest timestamp for the content in the directory';
create table revision
(
id bigserial primary key, -- internal identifier of the revision
sha1 sha1_git unique not null, -- intrinsic identifier of the revision
date timestamptz, -- timestamp of the revision
origin bigint -- id of the preferred origin
-- foreign key (org) references origin (id)
);
comment on column revision.id is 'Revision internal identifier';
comment on column revision.sha1 is 'Revision intrinsic identifier';
comment on column revision.date is 'Revision timestamp';
comment on column revision.origin is 'preferred origin for the revision';
create table location
(
id bigserial primary key, -- internal identifier of the location
path unix_path unique not null -- path to the location
);
comment on column location.id is 'Location internal identifier';
comment on column location.path is 'Path to the location';
create table origin
(
id bigserial primary key, -- internal identifier of the origin
sha1 sha1_git unique not null, -- intrinsic identifier of the origin
url unix_path unique not null -- url of the origin
);
comment on column origin.id is 'Origin internal identifier';
comment on column origin.sha1 is 'Origin intrinsic identifier';
comment on column origin.url is 'URL of the origin';
-- relation tables
create table content_in_revision
(
content bigint not null, -- internal identifier of the content blob
+\if :dbflavor_norm
revision bigint not null, -- internal identifier of the revision where the blob appears for the first time
location bigint -- location of the content relative to the revision root directory
+\else
+ revision bigint[], -- internal identifier of the revision where the blob appears for the first time
+ location bigint[] -- location of the content relative to the revision root directory
+\endif
-- foreign key (blob) references content (id),
-- foreign key (rev) references revision (id),
-- foreign key (loc) references location (id)
);
comment on column content_in_revision.content is 'Content internal identifier';
comment on column content_in_revision.revision is 'Revision internal identifier';
comment on column content_in_revision.location is 'Location of content in revision';
create table content_in_directory
(
content bigint not null, -- internal identifier of the content blob
+\if :dbflavor_norm
directory bigint not null, -- internal identifier of the directory containing the blob
location bigint -- location of the content relative to its parent directory in the isochrone frontier
+\else
+ directory bigint[],
+ location bigint[]
+\endif
-- foreign key (blob) references content (id),
-- foreign key (dir) references directory (id),
-- foreign key (loc) references location (id)
);
comment on column content_in_directory.content is 'Content internal identifier';
comment on column content_in_directory.directory is 'Directory internal identifier';
comment on column content_in_directory.location is 'Location of content in directory';
create table directory_in_revision
(
directory bigint not null, -- internal identifier of the directory appearing in the revision
+\if :dbflavor_norm
revision bigint not null, -- internal identifier of the revision containing the directory
location bigint -- location of the directory relative to the revision root directory
+\else
+ revision bigint[],
+ location bigint[]
+\endif
-- foreign key (dir) references directory (id),
-- foreign key (rev) references revision (id),
-- foreign key (loc) references location (id)
);
comment on column directory_in_revision.directory is 'Directory internal identifier';
comment on column directory_in_revision.revision is 'Revision internal identifier';
comment on column directory_in_revision.location is 'Location of directory in revision';
create table revision_in_origin
(
revision bigint not null, -- internal identifier of the revision poined by the origin
origin bigint not null -- internal identifier of the origin that points to the revision
-- foreign key (rev) references revision (id),
-- foreign key (org) references origin (id)
);
comment on column revision_in_origin.revision is 'Revision internal identifier';
comment on column revision_in_origin.origin is 'Origin internal identifier';
create table revision_before_revision
(
prev bigserial not null, -- internal identifier of the source revision
next bigserial not null -- internal identifier of the destination revision
-- foreign key (prev) references revision (id),
-- foreign key (next) references revision (id)
);
comment on column revision_before_revision.prev is 'Source revision internal identifier';
comment on column revision_before_revision.next is 'Destination revision internal identifier';
diff --git a/swh/provenance/sql/60-indexes.sql b/swh/provenance/sql/60-indexes.sql
index 5a51468..6141077 100644
--- a/swh/provenance/sql/60-indexes.sql
+++ b/swh/provenance/sql/60-indexes.sql
@@ -1,9 +1,18 @@
-- psql variables to get the current database flavor
+select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset
-- create unique indexes (instead of pkey) because location might be null for
-- the without-path flavor
+\if :dbflavor_norm
create unique index on content_in_revision(content, revision, location);
create unique index on directory_in_revision(directory, revision, location);
create unique index on content_in_directory(content, directory, location);
+\else
+create unique index on content_in_revision(content);
+create unique index on directory_in_revision(directory);
+create unique index on content_in_directory(content);
+\endif
+
+
alter table revision_in_origin add primary key (revision, origin);
alter table revision_before_revision add primary key (prev, next);
diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py
index b07f481..aa54503 100644
--- a/swh/provenance/tests/conftest.py
+++ b/swh/provenance/tests/conftest.py
@@ -1,273 +1,283 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from os import path
import re
from typing import Any, Dict, Iterable, Iterator, List, Optional
import msgpack
import psycopg2
import pytest
from typing_extensions import TypedDict
from swh.journal.serializers import msgpack_ext_hook
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.provenance import get_provenance, get_provenance_storage
from swh.provenance.api.client import RemoteProvenanceStorage
import swh.provenance.api.server as server
from swh.provenance.archive import ArchiveInterface
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface
from swh.provenance.postgresql.archive import ArchivePostgreSQL
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.postgresql.storage import Storage
from swh.storage.replay import process_replay_objects
-@pytest.fixture(params=["with-path", "without-path"])
+@pytest.fixture(
+ params=[
+ "with-path",
+ "without-path",
+ "with-path-denormalized",
+ "without-path-denormalized",
+ ]
+)
def populated_db(
request, # TODO: add proper type annotation
postgresql: psycopg2.extensions.connection,
) -> Dict[str, str]:
+ """return a working and initialized provenance db"""
from swh.core.cli.db import populate_database_for_package
- flavor = "with-path" if request.param == "client-server" else request.param
- populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor)
+ # flavor = "with-path" if request.param == "client-server" else request.param
+ populate_database_for_package(
+ "swh.provenance", postgresql.dsn, flavor=request.param
+ )
return {
- item.split("=")[0]: item.split("=")[1]
- for item in postgresql.dsn.split()
- if item.split("=")[0] != "options"
+ k: v
+ for (k, v) in (item.split("=") for item in postgresql.dsn.split())
+ if k != "options"
}
# the Flask app used as server in these tests
@pytest.fixture
def app(populated_db: Dict[str, str]):
assert hasattr(server, "storage")
server.storage = get_provenance_storage(cls="local", db=populated_db)
yield server.app
# the RPCClient class used as client used in these tests
@pytest.fixture
def swh_rpc_client_class():
return RemoteProvenanceStorage
@pytest.fixture(params=["local", "remote"])
def provenance(
request, # TODO: add proper type annotation
populated_db: Dict[str, str],
swh_rpc_client: RemoteProvenanceStorage,
) -> ProvenanceInterface:
"""return a working and initialized provenance db"""
if request.param == "remote":
from swh.provenance.provenance import Provenance
assert isinstance(swh_rpc_client, ProvenanceStorageInterface)
return Provenance(swh_rpc_client)
else:
# in test sessions, we DO want to raise any exception occurring at commit time
prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True)
return prov
@pytest.fixture
def swh_storage_with_objects(swh_storage: Storage) -> Storage:
"""return a Storage object (postgresql-based by default) with a few of each
object type in it
The inserted content comes from swh.model.tests.swh_model_data.
"""
for obj_type in (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"origin",
"origin_visit",
"origin_visit_status",
):
getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type])
return swh_storage
@pytest.fixture
def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface:
return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn)
@pytest.fixture
def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface:
return ArchiveStorage(swh_storage_with_objects)
@pytest.fixture(params=["archive", "db"])
def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]:
"""Return a ArchivePostgreSQL based StorageInterface object"""
# this is a workaround to prevent tests from hanging because of an unclosed
# transaction.
# TODO: refactor the ArchivePostgreSQL to properly deal with
# transactions and get rid of this fixture
if request.param == "db":
archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn)
yield archive
archive.conn.rollback()
else:
yield ArchiveStorage(swh_storage_with_objects)
def get_datafile(fname: str) -> str:
return path.join(path.dirname(__file__), "data", fname)
def load_repo_data(repo: str) -> Dict[str, Any]:
data: Dict[str, Any] = {}
with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj:
unpacker = msgpack.Unpacker(
fobj,
raw=False,
ext_hook=msgpack_ext_hook,
strict_map_key=False,
timestamp=3, # convert Timestamp in datetime objects (tz UTC)
)
for objtype, objd in unpacker:
data.setdefault(objtype, []).append(objd)
return data
def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]:
return {k: v for (k, v) in d.items() if k in keys}
def fill_storage(storage: Storage, data: Dict[str, Any]) -> None:
process_replay_objects(data, storage=storage)
class SynthRelation(TypedDict):
prefix: Optional[str]
path: str
src: Sha1Git
dst: Sha1Git
rel_ts: float
class SynthRevision(TypedDict):
sha1: Sha1Git
date: float
msg: str
R_C: List[SynthRelation]
R_D: List[SynthRelation]
D_C: List[SynthRelation]
def synthetic_result(filename: str) -> Iterator[SynthRevision]:
"""Generates dict representations of synthetic revisions found in the synthetic
file (from the data/ directory) given as argument of the generator.
Generated SynthRevision (typed dict) with the following elements:
"sha1": (Sha1Git) sha1 of the revision,
"date": (float) timestamp of the revision,
"msg": (str) commit message of the revision,
"R_C": (list) new R---C relations added by this revision
"R_D": (list) new R-D relations added by this revision
"D_C": (list) new D-C relations added by this revision
Each relation above is a SynthRelation typed dict with:
"path": (str) location
"src": (Sha1Git) sha1 of the source of the relation
"dst": (Sha1Git) sha1 of the destination of the relation
"rel_ts": (float) timestamp of the target of the relation
(related to the timestamp of the revision)
"""
with open(get_datafile(filename), "r") as fobj:
yield from _parse_synthetic_file(fobj)
def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]:
"""Read a 'synthetic' file and generate a dict representation of the synthetic
revision for each revision listed in the synthetic file.
"""
regs = [
"(?P<revname>R[0-9]{2,4})?",
"(?P<reltype>[^| ]*)",
"([+] )?(?P<path>[^| +]*?)[/]?",
"(?P<type>[RDC]) (?P<sha1>[0-9a-z]{40})",
"(?P<ts>-?[0-9]+(.[0-9]+)?)",
]
regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$")
current_rev: List[dict] = []
for m in (regex.match(line) for line in fobj):
if m:
d = m.groupdict()
if d["revname"]:
if current_rev:
yield _mk_synth_rev(current_rev)
current_rev.clear()
current_rev.append(d)
if current_rev:
yield _mk_synth_rev(current_rev)
def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision:
assert synth_rev[0]["type"] == "R"
rev = SynthRevision(
sha1=hash_to_bytes(synth_rev[0]["sha1"]),
date=float(synth_rev[0]["ts"]),
msg=synth_rev[0]["revname"],
R_C=[],
R_D=[],
D_C=[],
)
current_path = None
# path of the last R-D relation we parsed, used a prefix for next D-C
# relations
for row in synth_rev[1:]:
if row["reltype"] == "R---C":
assert row["type"] == "C"
rev["R_C"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = None
elif row["reltype"] == "R-D":
assert row["type"] == "D"
rev["R_D"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = row["path"]
elif row["reltype"] == "D-C":
assert row["type"] == "C"
rev["D_C"].append(
SynthRelation(
prefix=current_path,
path=row["path"],
src=rev["R_D"][-1]["dst"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
return rev
diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py
index 4b684a0..deeaf09 100644
--- a/swh/provenance/tests/test_provenance_db.py
+++ b/swh/provenance/tests/test_provenance_db.py
@@ -1,51 +1,56 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
from typing import Type
from swh.model.model import OriginVisitStatus
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface
from swh.provenance.model import OriginEntry
from swh.provenance.origin import origin_add
from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase
from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB
from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.postgresql.storage import Storage
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime
# from swh.model.model
def ts2dt(ts: dict) -> datetime:
timestamp = datetime.fromtimestamp(
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"]))
)
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"])
def test_provenance_origin_add(
provenance: ProvenanceInterface, swh_storage_with_objects: Storage
) -> None:
"""Test the origin_add function"""
archive = ArchiveStorage(swh_storage_with_objects)
for status in TEST_OBJECTS["origin_visit_status"]:
assert isinstance(status, OriginVisitStatus)
if status.snapshot is not None:
entry = OriginEntry(url=status.origin, snapshot=status.snapshot)
origin_add(provenance, archive, [entry])
# TODO: check some facts here
def test_provenance_flavor(provenance: ProvenanceInterface) -> None:
if isinstance(provenance.storage, ProvenanceDBBase):
- assert provenance.storage.flavor in ("with-path", "without-path")
+ assert provenance.storage.flavor in (
+ "with-path",
+ "without-path",
+ "with-path-denormalized",
+ "without-path-denormalized",
+ )
backend_class: Type[ProvenanceStorageInterface]
- if provenance.storage.flavor == "with-path":
+ if "with-path" in provenance.storage.flavor:
backend_class = ProvenanceWithPathDB
else:
backend_class = ProvenanceWithoutPathDB
assert isinstance(provenance.storage, backend_class)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 6:23 PM (5 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3316163
Attached To
rDPROV Provenance database
Event Timeline
Log In to Comment