Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenance.py
- This file was moved from swh/provenance/postgresql/provenancedb.py.
Show All 18 Lines | from ..interface import ( | ||||
EntityType, | EntityType, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
class ProvenanceDB: | class ProvenanceStoragePostgreSql: | ||||
def __init__( | def __init__( | ||||
self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False | self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False | ||||
) -> None: | ) -> None: | ||||
BaseDb.adapt_conn(conn) | BaseDb.adapt_conn(conn) | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
conn.set_session(autocommit=True) | conn.set_session(autocommit=True) | ||||
self.conn = conn | self.conn = conn | ||||
self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | ||||
Show All 32 Lines | def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
return self._entity_get_date("directory", ids) | return self._entity_get_date("directory", ids) | ||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | ||||
sql = f"SELECT sha1 FROM {entity.value}" | sql = f"SELECT sha1 FROM {entity.value}" | ||||
self.cursor.execute(sql) | self.cursor.execute(sql) | ||||
return {row["sha1"] for row in self.cursor.fetchall()} | return {row["sha1"] for row in self.cursor.fetchall()} | ||||
def location_get(self) -> Set[bytes]: | def location_get(self) -> Set[bytes]: | ||||
sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location" | sql = "SELECT location.path AS path FROM location" | ||||
self.cursor.execute(sql) | self.cursor.execute(sql) | ||||
return {row["path"] for row in self.cursor.fetchall()} | return {row["path"] for row in self.cursor.fetchall()} | ||||
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | ||||
try: | try: | ||||
if urls: | if urls: | ||||
sql = """ | sql = """ | ||||
LOCK TABLE ONLY origin; | LOCK TABLE ONLY origin; | ||||
INSERT INTO origin(sha1, url) VALUES %s | INSERT INTO origin(sha1, url) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | logging.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | ||||
urls: Dict[Sha1Git, str] = {} | urls: Dict[Sha1Git, str] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
# TODO: consider splitting this query in several ones if sha1s is too big! | |||||
values = ", ".join(itertools.repeat("%s", len(sha1s))) | values = ", ".join(itertools.repeat("%s", len(sha1s))) | ||||
sql = f""" | sql = f""" | ||||
SELECT sha1, url | SELECT sha1, url | ||||
FROM origin | FROM origin | ||||
WHERE sha1 IN ({values}) | WHERE sha1 IN ({values}) | ||||
""" | """ | ||||
self.cursor.execute(sql, sha1s) | self.cursor.execute(sql, sha1s) | ||||
urls.update( | urls.update( | ||||
Show All 37 Lines | def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
return False | return False | ||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | ||||
result: Dict[Sha1Git, RevisionData] = {} | result: Dict[Sha1Git, RevisionData] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
# TODO: consider splitting this query in several ones if sha1s is too big! | |||||
values = ", ".join(itertools.repeat("%s", len(sha1s))) | values = ", ".join(itertools.repeat("%s", len(sha1s))) | ||||
sql = f""" | sql = f""" | ||||
SELECT sha1, date, origin | SELECT R.sha1, R.date, O.sha1 AS origin | ||||
FROM revision | FROM revision AS R | ||||
WHERE sha1 IN ({values}) | LEFT JOIN origin AS O ON (O.id=R.origin) | ||||
WHERE R.sha1 IN ({values}) | |||||
""" | """ | ||||
self.cursor.execute(sql, sha1s) | self.cursor.execute(sql, sha1s) | ||||
result.update( | result.update( | ||||
(row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) | (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) | ||||
for row in self.cursor.fetchall() | for row in self.cursor.fetchall() | ||||
) | ) | ||||
return result | return result | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | ) -> bool: | ||||
*joins, | *joins, | ||||
] | ] | ||||
if self.denormalized and relation not in nope: | if self.denormalized and relation not in nope: | ||||
sql_l.append("GROUP BY S.id") | sql_l.append("GROUP BY S.id") | ||||
sql_l.append( | sql_l.append( | ||||
f"""ON CONFLICT ({src}) DO UPDATE | f"""ON CONFLICT ({src}) DO UPDATE | ||||
SET {dst}=ARRAY( | SET {dst}=ARRAY( | ||||
SELECT UNNEST({table}.{dst} || excluded.{dst})), | SELECT UNNEST({table}.{dst} || EXCLUDED.{dst}) | ||||
location=ARRAY( | ), location=ARRAY( | ||||
SELECT UNNEST({relation.value}.location || excluded.location)) | SELECT UNNEST({relation.value}.location || EXCLUDED.location) | ||||
) | |||||
""" | """ | ||||
) | ) | ||||
else: | else: | ||||
sql_l.append("ON CONFLICT DO NOTHING") | sql_l.append("ON CONFLICT DO NOTHING") | ||||
sql = "\n".join(sql_l) | sql = "\n".join(sql_l) | ||||
psycopg2.extras.execute_values(self.cursor, sql, rows) | psycopg2.extras.execute_values(self.cursor, sql, rows) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
Show All 13 Lines | class ProvenanceStoragePostgreSql: | ||||
def _entity_get_date( | def _entity_get_date( | ||||
self, | self, | ||||
entity: Literal["content", "directory", "revision"], | entity: Literal["content", "directory", "revision"], | ||||
ids: Iterable[Sha1Git], | ids: Iterable[Sha1Git], | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
dates: Dict[Sha1Git, datetime] = {} | dates: Dict[Sha1Git, datetime] = {} | ||||
sha1s = tuple(ids) | sha1s = tuple(ids) | ||||
if sha1s: | if sha1s: | ||||
# TODO: consider splitting this query in several ones if sha1s is too big! | |||||
values = ", ".join(itertools.repeat("%s", len(sha1s))) | values = ", ".join(itertools.repeat("%s", len(sha1s))) | ||||
sql = f""" | sql = f""" | ||||
SELECT sha1, date | SELECT sha1, date | ||||
FROM {entity} | FROM {entity} | ||||
WHERE sha1 IN ({values}) | WHERE sha1 IN ({values}) | ||||
""" | """ | ||||
self.cursor.execute(sql, sha1s) | self.cursor.execute(sql, sha1s) | ||||
dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) | dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) | ||||
▲ Show 20 Lines • Show All 97 Lines • Show Last 20 Lines |