Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenancedb_base.py
from datetime import datetime | from datetime import datetime | ||||
import itertools | import itertools | ||||
import logging | import logging | ||||
from typing import Any, Dict, Generator, List, Optional, Set, Tuple | from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from ..provenance import ProvenanceResult | |||||
class ProvenanceDBBase: | class ProvenanceDBBase: | ||||
def __init__(self, conn: psycopg2.extensions.connection): | def __init__(self, conn: psycopg2.extensions.connection): | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
conn.set_session(autocommit=True) | conn.set_session(autocommit=True) | ||||
self.conn = conn | self.conn = conn | ||||
self.cursor = self.conn.cursor() | self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | ||||
douardda: I'm not convinced using a `RealDictCursor` for all queries really helps **here**, but meh. | |||||
# XXX: not sure this is the best place to do it! | # XXX: not sure this is the best place to do it! | ||||
self.cursor.execute("SET timezone TO 'UTC'") | self.cursor.execute("SET timezone TO 'UTC'") | ||||
self._flavor: Optional[str] = None | self._flavor: Optional[str] = None | ||||
@property | @property | ||||
def flavor(self) -> str: | def flavor(self) -> str: | ||||
if self._flavor is None: | if self._flavor is None: | ||||
self.cursor.execute("select swh_get_dbflavor()") | self.cursor.execute("SELECT swh_get_dbflavor() AS flavor") | ||||
self._flavor = self.cursor.fetchone()[0] | self._flavor = self.cursor.fetchone()["flavor"] | ||||
assert self._flavor is not None | assert self._flavor is not None | ||||
return self._flavor | return self._flavor | ||||
@property | @property | ||||
def with_path(self) -> bool: | def with_path(self) -> bool: | ||||
return self.flavor == "with-path" | return self.flavor == "with-path" | ||||
def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: | def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: | ||||
try: | try: | ||||
# First insert entities | # First insert entities | ||||
for entity in ("content", "directory", "revision"): | for entity in ("content", "directory", "revision"): | ||||
self.insert_entity( | self.insert_entity( | ||||
entity, | entity, | ||||
{ | { | ||||
sha1: data[entity]["data"][sha1] | sha1: data[entity]["data"][sha1] | ||||
for sha1 in data[entity]["added"] | for sha1 in data[entity]["added"] | ||||
Show All 39 Lines | def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | logging.exception("Unexpected error") | ||||
if raise_on_commit: | if raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def content_find_first( | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
self, id: Sha1Git | |||||
) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: | |||||
... | ... | ||||
def content_find_all( | def content_find_all( | ||||
self, id: Sha1Git, limit: Optional[int] = None | self, id: Sha1Git, limit: Optional[int] = None | ||||
) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: | ) -> Generator[ProvenanceResult, None, None]: | ||||
... | ... | ||||
def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: | def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
dates = {} | dates: Dict[Sha1Git, datetime] = {} | ||||
if ids: | if ids: | ||||
values = ", ".join(itertools.repeat("%s", len(ids))) | values = ", ".join(itertools.repeat("%s", len(ids))) | ||||
self.cursor.execute( | self.cursor.execute( | ||||
f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", | f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", | ||||
tuple(ids), | tuple(ids), | ||||
) | ) | ||||
dates.update(self.cursor.fetchall()) | dates.update(((row["sha1"], row["date"]) for row in self.cursor.fetchall())) | ||||
return dates | return dates | ||||
def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): | def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): | ||||
if data: | if data: | ||||
psycopg2.extras.execute_values( | psycopg2.extras.execute_values( | ||||
self.cursor, | self.cursor, | ||||
f""" | f""" | ||||
LOCK TABLE ONLY {entity}; | LOCK TABLE ONLY {entity}; | ||||
▲ Show 20 Lines • Show All 95 Lines • ▼ Show 20 Lines | def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: | ||||
SELECT O.sha1 | SELECT O.sha1 | ||||
FROM revision AS R | FROM revision AS R | ||||
JOIN origin as O | JOIN origin as O | ||||
ON R.origin=O.id | ON R.origin=O.id | ||||
WHERE R.sha1=%s""", | WHERE R.sha1=%s""", | ||||
(revision,), | (revision,), | ||||
) | ) | ||||
row = self.cursor.fetchone() | row = self.cursor.fetchone() | ||||
return row[0] if row is not None else None | return row["sha1"] if row is not None else None | ||||
def revision_in_history(self, revision: Sha1Git) -> bool: | def revision_in_history(self, revision: Sha1Git) -> bool: | ||||
self.cursor.execute( | self.cursor.execute( | ||||
""" | """ | ||||
SELECT 1 | SELECT 1 | ||||
FROM revision_before_revision | FROM revision_before_revision | ||||
JOIN revision | JOIN revision | ||||
ON revision.id=revision_before_revision.prev | ON revision.id=revision_before_revision.prev | ||||
Show All 35 Lines |
I'm not convinced using a RealDictCursor for all queries really helps here, but meh.