Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenance.py
Show All 17 Lines | |||||
from ..interface import ( | from ..interface import ( | ||||
EntityType, | EntityType, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | |||||
class ProvenanceStoragePostgreSql: | class ProvenanceStoragePostgreSql: | ||||
def __init__( | def __init__( | ||||
self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False | self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False | ||||
) -> None: | ) -> None: | ||||
BaseDb.adapt_conn(conn) | BaseDb.adapt_conn(conn) | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
conn.set_session(autocommit=True) | conn.set_session(autocommit=True) | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def location_get(self) -> Set[bytes]: | ||||
sql = "SELECT location.path AS path FROM location" | sql = "SELECT location.path AS path FROM location" | ||||
self.cursor.execute(sql) | self.cursor.execute(sql) | ||||
return {row["path"] for row in self.cursor.fetchall()} | return {row["path"] for row in self.cursor.fetchall()} | ||||
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | ||||
try: | try: | ||||
if urls: | if urls: | ||||
sql = """ | sql = """ | ||||
LOCK TABLE ONLY origin; | |||||
INSERT INTO origin(sha1, url) VALUES %s | INSERT INTO origin(sha1, url) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | ||||
urls: Dict[Sha1Git, str] = {} | urls: Dict[Sha1Git, str] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
Show All 10 Lines | class ProvenanceStoragePostgreSql: | ||||
def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | ||||
return self._entity_set_date("revision", dates) | return self._entity_set_date("revision", dates) | ||||
def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | ||||
try: | try: | ||||
if origins: | if origins: | ||||
sql = """ | sql = """ | ||||
LOCK TABLE ONLY revision; | |||||
INSERT INTO revision(sha1, origin) | INSERT INTO revision(sha1, origin) | ||||
(SELECT V.rev AS sha1, O.id AS origin | (SELECT V.rev AS sha1, O.id AS origin | ||||
FROM (VALUES %s) AS V(rev, org) | FROM (VALUES %s) AS V(rev, org) | ||||
JOIN origin AS O ON (O.sha1=V.org)) | JOIN origin AS O ON (O.sha1=V.org)) | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET origin=EXCLUDED.origin | UPDATE SET origin=EXCLUDED.origin | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | ||||
result: Dict[Sha1Git, RevisionData] = {} | result: Dict[Sha1Git, RevisionData] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
Show All 21 Lines | ) -> bool: | ||||
rel_table = relation.value | rel_table = relation.value | ||||
src_table, *_, dst_table = rel_table.split("_") | src_table, *_, dst_table = rel_table.split("_") | ||||
if src_table != "origin": | if src_table != "origin": | ||||
# Origin entries should be inserted previously as they require extra | # Origin entries should be inserted previously as they require extra | ||||
# non-null information | # non-null information | ||||
srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) | srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) | ||||
sql = f""" | sql = f""" | ||||
LOCK TABLE ONLY {src_table}; | |||||
INSERT INTO {src_table}(sha1) VALUES %s | INSERT INTO {src_table}(sha1) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, srcs) | psycopg2.extras.execute_values(self.cursor, sql, srcs) | ||||
if dst_table != "origin": | if dst_table != "origin": | ||||
# Origin entries should be inserted previously as they require extra | # Origin entries should be inserted previously as they require extra | ||||
# non-null information | # non-null information | ||||
dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) | dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) | ||||
sql = f""" | sql = f""" | ||||
LOCK TABLE ONLY {dst_table}; | |||||
INSERT INTO {dst_table}(sha1) VALUES %s | INSERT INTO {dst_table}(sha1) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, dsts) | psycopg2.extras.execute_values(self.cursor, sql, dsts) | ||||
# Put the next three queries in a manual single transaction: | # Put the next three queries in a manual single transaction: | ||||
# they use the same temp table | # they use the same temp table | ||||
with self.conn: | with self.conn: | ||||
with self.conn.cursor() as cur: | with self.conn.cursor() as cur: | ||||
cur.execute("SELECT swh_mktemp_relation_add()") | cur.execute("SELECT swh_mktemp_relation_add()") | ||||
psycopg2.extras.execute_values( | psycopg2.extras.execute_values( | ||||
cur, | cur, | ||||
sql=( | sql=( | ||||
"INSERT INTO tmp_relation_add (src, dst, path) " | "INSERT INTO tmp_relation_add (src, dst, path) " | ||||
"VALUES %s" | "VALUES %s" | ||||
), | ), | ||||
argslist=rows, | argslist=rows, | ||||
) | ) | ||||
sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" | sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" | ||||
cur.execute(sql, (rel_table, src_table, dst_table)) | cur.execute(sql, (rel_table, src_table, dst_table)) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def relation_get( | def relation_get( | ||||
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | ||||
) -> Set[RelationData]: | ) -> Set[RelationData]: | ||||
return self._relation_get(relation, ids, reverse) | return self._relation_get(relation, ids, reverse) | ||||
Show All 24 Lines | class ProvenanceStoragePostgreSql: | ||||
def _entity_set_date( | def _entity_set_date( | ||||
self, | self, | ||||
entity: Literal["content", "directory", "revision"], | entity: Literal["content", "directory", "revision"], | ||||
data: Dict[Sha1Git, datetime], | data: Dict[Sha1Git, datetime], | ||||
) -> bool: | ) -> bool: | ||||
try: | try: | ||||
if data: | if data: | ||||
sql = f""" | sql = f""" | ||||
LOCK TABLE ONLY {entity}; | |||||
INSERT INTO {entity}(sha1, date) VALUES %s | INSERT INTO {entity}(sha1, date) VALUES %s | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, data.items()) | psycopg2.extras.execute_values(self.cursor, sql, data.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def _relation_get( | def _relation_get( | ||||
self, | self, | ||||
relation: RelationType, | relation: RelationType, | ||||
ids: Optional[Iterable[Sha1Git]], | ids: Optional[Iterable[Sha1Git]], | ||||
Show All 23 Lines |