Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenancedb_base.py
from datetime import datetime | from datetime import datetime | ||||
import itertools | import itertools | ||||
import logging | import logging | ||||
from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple | from typing import Dict, Generator, Iterable, Optional, Set, Tuple | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from typing_extensions import Literal | |||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from ..provenance import ProvenanceResult | from ..provenance import ProvenanceResult, RelationType | ||||
class ProvenanceDBBase: | class ProvenanceDBBase: | ||||
raise_on_commit: bool = False | |||||
def __init__(self, conn: psycopg2.extensions.connection): | def __init__(self, conn: psycopg2.extensions.connection): | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
conn.set_session(autocommit=True) | conn.set_session(autocommit=True) | ||||
self.conn = conn | self.conn = conn | ||||
self.cursor = self.conn.cursor() | self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | ||||
# XXX: not sure this is the best place to do it! | # XXX: not sure this is the best place to do it! | ||||
self.cursor.execute("SET timezone TO 'UTC'") | sql = "SET timezone TO 'UTC'" | ||||
self.cursor.execute(sql) | |||||
self._flavor: Optional[str] = None | self._flavor: Optional[str] = None | ||||
douardda: For a oneliner like this, using an intermediate local variable seems a bit overkill to me. | |||||
@property | @property | ||||
def flavor(self) -> str: | def flavor(self) -> str: | ||||
if self._flavor is None: | if self._flavor is None: | ||||
self.cursor.execute("select swh_get_dbflavor()") | sql = "SELECT swh_get_dbflavor() AS flavor" | ||||
self._flavor = self.cursor.fetchone()[0] | self.cursor.execute(sql) | ||||
self._flavor = self.cursor.fetchone()["flavor"] | |||||
Done Inline Actionssame as above douardda: same as above | |||||
assert self._flavor is not None | assert self._flavor is not None | ||||
return self._flavor | return self._flavor | ||||
@property | @property | ||||
def with_path(self) -> bool: | def with_path(self) -> bool: | ||||
return self.flavor == "with-path" | return self.flavor == "with-path" | ||||
def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: | |||||
try: | |||||
# First insert entities | |||||
for entity in ("content", "directory", "revision"): | |||||
self.insert_entity( | |||||
entity, | |||||
{ | |||||
sha1: data[entity]["data"][sha1] | |||||
for sha1 in data[entity]["added"] | |||||
}, | |||||
) | |||||
data[entity]["data"].clear() | |||||
data[entity]["added"].clear() | |||||
# Relations should come after ids for entities were resolved | |||||
for relation in ( | |||||
"content_in_revision", | |||||
"content_in_directory", | |||||
"directory_in_revision", | |||||
): | |||||
self.insert_relation(relation, data[relation]) | |||||
# Insert origins | |||||
self.insert_origin( | |||||
{ | |||||
sha1: data["origin"]["data"][sha1] | |||||
for sha1 in data["origin"]["added"] | |||||
}, | |||||
) | |||||
data["origin"]["data"].clear() | |||||
data["origin"]["added"].clear() | |||||
# Insert relations from the origin-revision layer | |||||
self.insert_revision_history(data["revision_before_revision"]) | |||||
self.insert_origin_head(data["revision_in_origin"]) | |||||
# Update preferred origins | |||||
self.update_preferred_origin( | |||||
{ | |||||
sha1: data["revision_origin"]["data"][sha1] | |||||
for sha1 in data["revision_origin"]["added"] | |||||
} | |||||
) | |||||
data["revision_origin"]["data"].clear() | |||||
data["revision_origin"]["added"].clear() | |||||
return True | |||||
except: # noqa: E722 | |||||
# Unexpected error occurred, rollback all changes and log message | |||||
logging.exception("Unexpected error") | |||||
if raise_on_commit: | |||||
raise | |||||
return False | |||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
... | ... | ||||
def content_find_all( | def content_find_all( | ||||
self, id: Sha1Git, limit: Optional[int] = None | self, id: Sha1Git, limit: Optional[int] = None | ||||
) -> Generator[ProvenanceResult, None, None]: | ) -> Generator[ProvenanceResult, None, None]: | ||||
... | ... | ||||
def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: | def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | ||||
dates = {} | return self._entity_set_date("content", dates) | ||||
if ids: | |||||
values = ", ".join(itertools.repeat("%s", len(ids))) | |||||
self.cursor.execute( | |||||
f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", | |||||
tuple(ids), | |||||
) | |||||
dates.update(self.cursor.fetchall()) | |||||
return dates | |||||
def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): | def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
if data: | return self._entity_get_date("content", ids) | ||||
psycopg2.extras.execute_values( | |||||
self.cursor, | |||||
f""" | |||||
LOCK TABLE ONLY {entity}; | |||||
INSERT INTO {entity}(sha1, date) VALUES %s | |||||
ON CONFLICT (sha1) DO | |||||
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | |||||
""", | |||||
data.items(), | |||||
) | |||||
# XXX: not sure if Python takes a reference or a copy. | |||||
# This might be useless! | |||||
data.clear() | |||||
def insert_origin(self, data: Dict[Sha1Git, str]): | def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | ||||
if data: | return self._entity_set_date("directory", dates) | ||||
psycopg2.extras.execute_values( | |||||
self.cursor, | def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
""" | return self._entity_get_date("directory", ids) | ||||
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | |||||
try: | |||||
if urls: | |||||
sql = """ | |||||
LOCK TABLE ONLY origin; | LOCK TABLE ONLY origin; | ||||
INSERT INTO origin(sha1, url) VALUES %s | INSERT INTO origin(sha1, url) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""", | """ | ||||
data.items(), | psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | ||||
) | return True | ||||
# XXX: not sure if Python takes a reference or a copy. | except: # noqa: E722 | ||||
# This might be useless! | # Unexpected error occurred, rollback all changes and log message | ||||
data.clear() | logging.exception("Unexpected error") | ||||
if self.raise_on_commit: | |||||
raise | |||||
return False | |||||
def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): | def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | ||||
if data: | urls: Dict[Sha1Git, str] = {} | ||||
# Insert revisions first, to ensure "foreign keys" exist | sha1s = tuple(ids) | ||||
# Origins are assumed to be already inserted (they require knowing the url) | if sha1s: | ||||
Done Inline Actionsthe "old ways" of doing this looks more readable to me: ", ".join(["%s"] * len(sha1s)) douardda: the "old ways" of doing this looks more readable to me:
```
", ".join(["%s"] * len(sha1s))
``` | |||||
Done Inline Actionsit's actually more cryptic to me... but I guess that's subjective aeviso: it's actually more cryptic to me... but I guess that's subjective | |||||
psycopg2.extras.execute_values( | values = ", ".join(itertools.repeat("%s", len(sha1s))) | ||||
self.cursor, | sql = f""" | ||||
SELECT sha1, url | |||||
FROM origin | |||||
WHERE sha1 IN ({values}) | |||||
""" | """ | ||||
LOCK TABLE ONLY revision; | self.cursor.execute(sql, sha1s) | ||||
INSERT INTO revision(sha1) VALUES %s | urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) | ||||
ON CONFLICT DO NOTHING | return urls | ||||
""", | |||||
set((rev,) for rev, _ in data), | |||||
) | |||||
psycopg2.extras.execute_values( | def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | ||||
self.cursor, | return self._entity_set_date("revision", dates) | ||||
# XXX: not clear how conflicts are handled here! | |||||
""" | |||||
LOCK TABLE ONLY revision_in_origin; | |||||
INSERT INTO revision_in_origin | |||||
SELECT R.id, O.id | |||||
FROM (VALUES %s) AS V(rev, org) | |||||
INNER JOIN revision AS R on (R.sha1=V.rev) | |||||
INNER JOIN origin AS O on (O.sha1=V.org) | |||||
ON CONFLICT DO NOTHING | |||||
""", | |||||
data, | |||||
) | |||||
data.clear() | |||||
def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): | def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | ||||
... | try: | ||||
if origins: | |||||
sql = """ | |||||
LOCK TABLE ONLY revision; | |||||
INSERT INTO revision(sha1, origin) | |||||
(SELECT V.rev AS sha1, O.id AS origin | |||||
FROM (VALUES %s) AS V(rev, org) | |||||
JOIN origin AS O ON (O.sha1=V.org)) | |||||
ON CONFLICT (sha1) DO | |||||
UPDATE SET origin=EXCLUDED.origin | |||||
""" | |||||
psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | |||||
return True | |||||
except: # noqa: E722 | |||||
# Unexpected error occurred, rollback all changes and log message | |||||
logging.exception("Unexpected error") | |||||
if self.raise_on_commit: | |||||
raise | |||||
return False | |||||
def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]): | def revision_get( | ||||
self, ids: Iterable[Sha1Git] | |||||
) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: | |||||
result: Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]] = {} | |||||
sha1s = tuple(ids) | |||||
if sha1s: | |||||
values = ", ".join(itertools.repeat("%s", len(sha1s))) | |||||
sql = f""" | |||||
SELECT sha1, date, origin | |||||
FROM revision | |||||
WHERE sha1 IN ({values}) | |||||
""" | |||||
self.cursor.execute(sql, sha1s) | |||||
result.update( | |||||
(row["sha1"], (row["date"], row["origin"])) | |||||
for row in self.cursor.fetchall() | |||||
) | |||||
return result | |||||
def relation_add( | |||||
self, | |||||
relation: RelationType, | |||||
data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], | |||||
) -> bool: | |||||
try: | |||||
if data: | if data: | ||||
# print(f"Inserting histories: {data}") | table = relation.value | ||||
# Insert revisions first, to ensure "foreign keys" exist | src, *_, dst = table.split("_") | ||||
revisions = set(data) | |||||
for rev in data: | if src != "origin": | ||||
revisions.update(data[rev]) | # Origin entries should be inserted previously as they require extra | ||||
psycopg2.extras.execute_values( | # non-null information | ||||
self.cursor, | srcs = tuple(set((sha1,) for (sha1, _, _) in data)) | ||||
sql = f""" | |||||
LOCK TABLE ONLY {src}; | |||||
INSERT INTO {src}(sha1) VALUES %s | |||||
ON CONFLICT DO NOTHING | |||||
""" | """ | ||||
LOCK TABLE ONLY revision; | psycopg2.extras.execute_values(self.cursor, sql, srcs) | ||||
INSERT INTO revision(sha1) VALUES %s | if dst != "origin": | ||||
# Origin entries should be inserted previously as they require extra | |||||
# non-null information | |||||
dsts = tuple(set((sha1,) for (_, sha1, _) in data)) | |||||
sql = f""" | |||||
LOCK TABLE ONLY {dst}; | |||||
INSERT INTO {dst}(sha1) VALUES %s | |||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""", | """ | ||||
((rev,) for rev in revisions), | psycopg2.extras.execute_values(self.cursor, sql, dsts) | ||||
) | joins = [ | ||||
f"INNER JOIN {src} AS S ON (S.sha1=V.src)", | |||||
f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", | |||||
] | |||||
selected = ["S.id", "D.id"] | |||||
if self._relation_uses_location_table(relation): | |||||
locations = tuple(set((path,) for (_, _, path) in data)) | |||||
sql = """ | |||||
LOCK TABLE ONLY location; | |||||
INSERT INTO location(path) VALUES %s | |||||
ON CONFLICT (path) DO NOTHING | |||||
""" | |||||
psycopg2.extras.execute_values(self.cursor, sql, locations) | |||||
values = [[(prev, next) for next in data[prev]] for prev in data] | joins.append("INNER JOIN location AS L ON (L.path=V.path)") | ||||
psycopg2.extras.execute_values( | selected.append("L.id") | ||||
self.cursor, | |||||
# XXX: not clear how conflicts are handled here! | |||||
""" | |||||
LOCK TABLE ONLY revision_before_revision; | |||||
INSERT INTO revision_before_revision | |||||
SELECT P.id, N.id | |||||
FROM (VALUES %s) AS V(prev, next) | |||||
INNER JOIN revision AS P on (P.sha1=V.prev) | |||||
INNER JOIN revision AS N on (N.sha1=V.next) | |||||
ON CONFLICT DO NOTHING | |||||
""", | |||||
sum(values, []), | |||||
) | |||||
data.clear() | |||||
def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: | sql = f""" | ||||
self.cursor.execute( | INSERT INTO {table} | ||||
(SELECT {", ".join(selected)} | |||||
FROM (VALUES %s) AS V(src, dst, path) | |||||
{''' | |||||
'''.join(joins)}) | |||||
ON CONFLICT DO NOTHING | |||||
""" | """ | ||||
SELECT O.sha1 | psycopg2.extras.execute_values(self.cursor, sql, data) | ||||
FROM revision AS R | return True | ||||
JOIN origin as O | except: # noqa: E722 | ||||
ON R.origin=O.id | # Unexpected error occurred, rollback all changes and log message | ||||
WHERE R.sha1=%s""", | logging.exception("Unexpected error") | ||||
(revision,), | if self.raise_on_commit: | ||||
) | raise | ||||
row = self.cursor.fetchone() | return False | ||||
return row[0] if row is not None else None | |||||
def revision_in_history(self, revision: Sha1Git) -> bool: | def relation_get( | ||||
self.cursor.execute( | self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | ||||
) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: | |||||
result: Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = set() | |||||
sha1s = tuple(ids) | |||||
if sha1s: | |||||
table = relation.value | |||||
src, *_, dst = table.split("_") | |||||
# TODO: improve this! | |||||
if src == "revision" and dst == "revision": | |||||
src_field = "prev" | |||||
dst_field = "next" | |||||
else: | |||||
src_field = src | |||||
dst_field = dst | |||||
joins = [ | |||||
f"INNER JOIN {src} AS S ON (S.id=R.{src_field})", | |||||
f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})", | |||||
] | |||||
selected = ["S.sha1 AS src", "D.sha1 AS dst"] | |||||
selector = "S.sha1" if not reverse else "D.sha1" | |||||
if self._relation_uses_location_table(relation): | |||||
joins.append("INNER JOIN location AS L ON (L.id=R.location)") | |||||
selected.append("L.path AS path") | |||||
else: | |||||
selected.append("NULL AS path") | |||||
sql = f""" | |||||
SELECT {", ".join(selected)} | |||||
FROM {table} AS R | |||||
{" ".join(joins)} | |||||
WHERE {selector} IN %s | |||||
""" | """ | ||||
SELECT 1 | self.cursor.execute(sql, (sha1s,)) | ||||
FROM revision_before_revision | result.update( | ||||
JOIN revision | (row["src"], row["dst"], row["path"]) for row in self.cursor.fetchall() | ||||
ON revision.id=revision_before_revision.prev | ) | ||||
WHERE revision.sha1=%s | return result | ||||
""", | |||||
(revision,), | def _entity_get_date( | ||||
) | self, | ||||
return self.cursor.fetchone() is not None | entity: Literal["content", "directory", "revision"], | ||||
ids: Iterable[Sha1Git], | |||||
def revision_visited(self, revision: Sha1Git) -> bool: | ) -> Dict[Sha1Git, datetime]: | ||||
self.cursor.execute( | dates: Dict[Sha1Git, datetime] = {} | ||||
sha1s = tuple(ids) | |||||
if sha1s: | |||||
values = ", ".join(itertools.repeat("%s", len(sha1s))) | |||||
sql = f""" | |||||
SELECT sha1, date | |||||
FROM {entity} | |||||
WHERE sha1 IN ({values}) | |||||
""" | """ | ||||
SELECT 1 | self.cursor.execute(sql, sha1s) | ||||
FROM revision_in_origin | dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) | ||||
JOIN revision | return dates | ||||
ON revision.id=revision_in_origin.revision | |||||
WHERE revision.sha1=%s | |||||
""", | |||||
(revision,), | |||||
) | |||||
return self.cursor.fetchone() is not None | |||||
def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): | def _entity_set_date( | ||||
self, | |||||
entity: Literal["content", "directory", "revision"], | |||||
data: Dict[Sha1Git, datetime], | |||||
) -> bool: | |||||
try: | |||||
if data: | if data: | ||||
# XXX: this is assuming the revision already exists in the db! It should | sql = f""" | ||||
# be improved by allowing null dates in the revision table. | LOCK TABLE ONLY {entity}; | ||||
psycopg2.extras.execute_values( | INSERT INTO {entity}(sha1, date) VALUES %s | ||||
self.cursor, | ON CONFLICT (sha1) DO | ||||
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | |||||
""" | """ | ||||
UPDATE revision R | psycopg2.extras.execute_values(self.cursor, sql, data.items()) | ||||
SET origin=O.id | return True | ||||
FROM (VALUES %s) AS V(rev, org) | except: # noqa: E722 | ||||
INNER JOIN origin AS O on (O.sha1=V.org) | # Unexpected error occurred, rollback all changes and log message | ||||
WHERE R.sha1=V.rev | logging.exception("Unexpected error") | ||||
""", | if self.raise_on_commit: | ||||
data.items(), | raise | ||||
) | return False | ||||
data.clear() | |||||
def _relation_uses_location_table(self, relation: RelationType) -> bool: | |||||
... |
For a oneliner like this, using an intermediate local variable seems a bit overkill to me.