Page MenuHomeSoftware Heritage

D6734.diff
No OneTemporary

D6734.diff

diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -7,6 +7,7 @@
from contextlib import contextmanager
from datetime import datetime
+from functools import wraps
import itertools
import logging
from types import TracebackType
@@ -34,6 +35,21 @@
STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds"
+def handle_raise_on_commit(f):
+ @wraps(f)
+ def handle(self, *args, **kwargs):
+ try:
+ return f(self, *args, **kwargs)
+ except BaseException as ex:
+ # Unexpected error occurred, rollback all changes and log message
+ LOGGER.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise ex
+ return False
+
+ return handle
+
+
class ProvenanceStoragePostgreSql:
def __init__(
self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs
@@ -82,26 +98,20 @@
self.conn.close()
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
+ @handle_raise_on_commit
def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
- try:
- if cnts:
- sql = """
- INSERT INTO content(sha1, date) VALUES %s
- ON CONFLICT (sha1) DO
- UPDATE SET date=LEAST(EXCLUDED.date,content.date)
- """
- page_size = self.page_size or len(cnts)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cursor, sql, argslist=cnts.items(), page_size=page_size
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if cnts:
+ sql = """
+ INSERT INTO content(sha1, date) VALUES %s
+ ON CONFLICT (sha1) DO
+ UPDATE SET date=LEAST(EXCLUDED.date,content.date)
+ """
+ page_size = self.page_size or len(cnts)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cursor, sql, argslist=cnts.items(), page_size=page_size
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
@@ -139,29 +149,23 @@
return dates
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
+ @handle_raise_on_commit
def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
data = [(sha1, rev.date, rev.flat) for sha1, rev in dirs.items()]
- try:
- if data:
- sql = """
- INSERT INTO directory(sha1, date, flat) VALUES %s
- ON CONFLICT (sha1) DO
- UPDATE SET
- date=LEAST(EXCLUDED.date, directory.date),
- flat=(EXCLUDED.flat OR directory.flat)
- """
- page_size = self.page_size or len(data)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cur=cursor, sql=sql, argslist=data, page_size=page_size
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if data:
+ sql = """
+ INSERT INTO directory(sha1, date, flat) VALUES %s
+ ON CONFLICT (sha1) DO
+ UPDATE SET
+ date=LEAST(EXCLUDED.date, directory.date),
+ flat=(EXCLUDED.flat OR directory.flat)
+ """
+ page_size = self.page_size or len(data)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cur=cursor, sql=sql, argslist=data, page_size=page_size
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]:
@@ -191,10 +195,9 @@
return {row["sha1"] for row in cursor}
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
+ @handle_raise_on_commit
def location_add(self, paths: Iterable[bytes]) -> bool:
- if not self.with_path():
- return True
- try:
+ if self.with_path():
values = [(path,) for path in paths]
if values:
sql = """
@@ -206,13 +209,7 @@
psycopg2.extras.execute_values(
cursor, sql, argslist=values, page_size=page_size
)
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
def location_get_all(self) -> Set[bytes]:
@@ -221,28 +218,22 @@
return {row["path"] for row in cursor}
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
+ @handle_raise_on_commit
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
- try:
- if orgs:
- sql = """
- INSERT INTO origin(sha1, url) VALUES %s
- ON CONFLICT DO NOTHING
- """
- page_size = self.page_size or len(orgs)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cur=cursor,
- sql=sql,
- argslist=orgs.items(),
- page_size=page_size,
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if orgs:
+ sql = """
+ INSERT INTO origin(sha1, url) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ page_size = self.page_size or len(orgs)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cur=cursor,
+ sql=sql,
+ argslist=orgs.items(),
+ page_size=page_size,
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"})
def open(self) -> None:
@@ -269,6 +260,7 @@
return urls
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
+ @handle_raise_on_commit
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
@@ -276,30 +268,23 @@
data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()]
else:
data = [(sha1, None, None) for sha1 in revs]
- try:
- if data:
- sql = """
- INSERT INTO revision(sha1, date, origin)
- (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin
- FROM (VALUES %s) AS V(rev, date, org)
- LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git))
- ON CONFLICT (sha1) DO
- UPDATE SET
- date=LEAST(EXCLUDED.date, revision.date),
- origin=COALESCE(EXCLUDED.origin, revision.origin)
- """
- page_size = self.page_size or len(data)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cur=cursor, sql=sql, argslist=data, page_size=page_size
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if data:
+ sql = """
+ INSERT INTO revision(sha1, date, origin)
+ (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin
+ FROM (VALUES %s) AS V(rev, date, org)
+ LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git))
+ ON CONFLICT (sha1) DO
+ UPDATE SET
+ date=LEAST(EXCLUDED.date, revision.date),
+ origin=COALESCE(EXCLUDED.origin, revision.origin)
+ """
+ page_size = self.page_size or len(data)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cur=cursor, sql=sql, argslist=data, page_size=page_size
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
@@ -324,34 +309,28 @@
return result
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
+ @handle_raise_on_commit
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
rows = [(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts]
- try:
- if rows:
- rel_table = relation.value
- src_table, *_, dst_table = rel_table.split("_")
- page_size = self.page_size or len(rows)
- # Put the next three queries in a manual single transaction:
- # they use the same temp table
- with self.transaction() as cursor:
- cursor.execute("SELECT swh_mktemp_relation_add()")
- psycopg2.extras.execute_values(
- cur=cursor,
- sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s",
- argslist=rows,
- page_size=page_size,
- )
- sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)"
- cursor.execute(query=sql, vars=(rel_table, src_table, dst_table))
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if rows:
+ rel_table = relation.value
+ src_table, *_, dst_table = rel_table.split("_")
+ page_size = self.page_size or len(rows)
+ # Put the next three queries in a manual single transaction:
+ # they use the same temp table
+ with self.transaction() as cursor:
+ cursor.execute("SELECT swh_mktemp_relation_add()")
+ psycopg2.extras.execute_values(
+ cur=cursor,
+ sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s",
+ argslist=rows,
+ page_size=page_size,
+ )
+ sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)"
+ cursor.execute(query=sql, vars=(rel_table, src_table, dst_table))
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
def relation_get(

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 6:10 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223551

Event Timeline