Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenance.py
Show First 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | def location_get(self) -> Set[bytes]: | ||||
sql = "SELECT location.path AS path FROM location" | sql = "SELECT location.path AS path FROM location" | ||||
self.cursor.execute(sql) | self.cursor.execute(sql) | ||||
return {row["path"] for row in self.cursor.fetchall()} | return {row["path"] for row in self.cursor.fetchall()} | ||||
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | ||||
try: | try: | ||||
if urls: | if urls: | ||||
sql = """ | sql = """ | ||||
LOCK TABLE ONLY origin; | |||||
INSERT INTO origin(sha1, url) VALUES %s | INSERT INTO origin(sha1, url) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | psycopg2.extras.execute_values(self.cursor, sql, urls.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | logging.exception("Unexpected error") | ||||
Show All 18 Lines | class ProvenanceStoragePostgreSql: | ||||
def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | ||||
return self._entity_set_date("revision", dates) | return self._entity_set_date("revision", dates) | ||||
def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | ||||
try: | try: | ||||
if origins: | if origins: | ||||
sql = """ | sql = """ | ||||
LOCK TABLE ONLY revision; | |||||
INSERT INTO revision(sha1, origin) | INSERT INTO revision(sha1, origin) | ||||
(SELECT V.rev AS sha1, O.id AS origin | (SELECT V.rev AS sha1, O.id AS origin | ||||
FROM (VALUES %s) AS V(rev, org) | FROM (VALUES %s) AS V(rev, org) | ||||
JOIN origin AS O ON (O.sha1=V.org)) | JOIN origin AS O ON (O.sha1=V.org)) | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET origin=EXCLUDED.origin | UPDATE SET origin=EXCLUDED.origin | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | psycopg2.extras.execute_values(self.cursor, sql, origins.items()) | ||||
Show All 33 Lines | ) -> bool: | ||||
rel_table = relation.value | rel_table = relation.value | ||||
src_table, *_, dst_table = rel_table.split("_") | src_table, *_, dst_table = rel_table.split("_") | ||||
if src_table != "origin": | if src_table != "origin": | ||||
# Origin entries should be inserted previously as they require extra | # Origin entries should be inserted previously as they require extra | ||||
# non-null information | # non-null information | ||||
srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) | srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) | ||||
sql = f""" | sql = f""" | ||||
LOCK TABLE ONLY {src_table}; | |||||
INSERT INTO {src_table}(sha1) VALUES %s | INSERT INTO {src_table}(sha1) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, srcs) | psycopg2.extras.execute_values(self.cursor, sql, srcs) | ||||
if dst_table != "origin": | if dst_table != "origin": | ||||
# Origin entries should be inserted previously as they require extra | # Origin entries should be inserted previously as they require extra | ||||
# non-null information | # non-null information | ||||
dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) | dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) | ||||
sql = f""" | sql = f""" | ||||
LOCK TABLE ONLY {dst_table}; | |||||
INSERT INTO {dst_table}(sha1) VALUES %s | INSERT INTO {dst_table}(sha1) VALUES %s | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, dsts) | psycopg2.extras.execute_values(self.cursor, sql, dsts) | ||||
sql = """ | sql = """ | ||||
SELECT * FROM swh_provenance_relation_add( | SELECT * FROM swh_provenance_relation_add(%s, %s, %s, %s::rel_row[]) | ||||
%s, %s, %s, %s::rel_row[] | |||||
) | |||||
""" | """ | ||||
self.cursor.execute(sql, (rel_table, src_table, dst_table, rows)) | self.cursor.execute(sql, (rel_table, src_table, dst_table, rows)) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.exception("Unexpected error") | logging.exception("Unexpected error") | ||||
if self.raise_on_commit: | if self.raise_on_commit: | ||||
raise | raise | ||||
Show All 30 Lines | class ProvenanceStoragePostgreSql: | ||||
def _entity_set_date( | def _entity_set_date( | ||||
self, | self, | ||||
entity: Literal["content", "directory", "revision"], | entity: Literal["content", "directory", "revision"], | ||||
data: Dict[Sha1Git, datetime], | data: Dict[Sha1Git, datetime], | ||||
) -> bool: | ) -> bool: | ||||
try: | try: | ||||
if data: | if data: | ||||
sql = f""" | sql = f""" | ||||
LOCK TABLE ONLY {entity}; | |||||
INSERT INTO {entity}(sha1, date) VALUES %s | INSERT INTO {entity}(sha1, date) VALUES %s | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | ||||
""" | """ | ||||
psycopg2.extras.execute_values(self.cursor, sql, data.items()) | psycopg2.extras.execute_values(self.cursor, sql, data.items()) | ||||
return True | return True | ||||
except: # noqa: E722 | except: # noqa: E722 | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
Show All 32 Lines |