Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenance.py
Show All 17 Lines | |||||
from ..interface import ( | from ..interface import ( | ||||
EntityType, | EntityType, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | |||||
class ProvenanceStoragePostgreSql: | class ProvenanceStoragePostgreSql: | ||||
def __init__( | def __init__( | ||||
self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False | self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False | ||||
) -> None: | ) -> None: | ||||
BaseDb.adapt_conn(conn) | BaseDb.adapt_conn(conn) | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
conn.set_session(autocommit=True) | conn.set_session(autocommit=True) | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | ||||
LOCK TABLE ONLY origin; | LOCK TABLE ONLY origin; | ||||
INSERT INTO origin(sha1, url) VALUES %s | INSERT INTO origin(sha1, url) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | ||||
urls: Dict[Sha1Git, str] = {} | urls: Dict[Sha1Git, str] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
Show All 22 Lines | def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | ||||
JOIN origin AS O ON (O.sha1=V.org)) | JOIN origin AS O ON (O.sha1=V.org)) | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET origin=EXCLUDED.origin | UPDATE SET origin=EXCLUDED.origin | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | ||||
result: Dict[Sha1Git, RevisionData] = {} | result: Dict[Sha1Git, RevisionData] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | ) -> bool: | ||||
), | ), | ||||
argslist=rows, | argslist=rows, | ||||
) | ) | ||||
sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" | sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" | ||||
cur.execute(sql, (rel_table, src_table, dst_table)) | cur.execute(sql, (rel_table, src_table, dst_table)) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def relation_get( | def relation_get( | ||||
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | ||||
) -> Set[RelationData]: | ) -> Set[RelationData]: | ||||
return self._relation_get(relation, ids, reverse) | return self._relation_get(relation, ids, reverse) | ||||
Show All 33 Lines | ) -> bool: | ||||
INSERT INTO {entity}(sha1, date) VALUES %s | INSERT INTO {entity}(sha1, date) VALUES %s | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, data.items()) | psycopg2.extras.execute_values(self.cursor, sql, data.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | LOGGER.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def _relation_get( | def _relation_get( | ||||
self, | self, | ||||
relation: RelationType, | relation: RelationType, | ||||
ids: Optional[Iterable[Sha1Git]], | ids: Optional[Iterable[Sha1Git]], | ||||
Show All 23 Lines |