Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124008
D6734.id24463.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D6734.id24463.diff
View Options
diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -7,6 +7,7 @@
from contextlib import contextmanager
from datetime import datetime
+from functools import wraps
import itertools
import logging
from types import TracebackType
@@ -34,6 +35,21 @@
STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds"
+def handle_raise_on_commit(f):
+ @wraps(f)
+ def handle(self, *args, **kwargs):
+ try:
+ return f(self, *args, **kwargs)
+ except BaseException as ex:
+ # Unexpected error occurred, rollback all changes and log message
+ LOGGER.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise ex
+ return False
+
+ return handle
+
+
class ProvenanceStoragePostgreSql:
def __init__(
self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs
@@ -82,26 +98,20 @@
self.conn.close()
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
+ @handle_raise_on_commit
def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
- try:
- if cnts:
- sql = """
- INSERT INTO content(sha1, date) VALUES %s
- ON CONFLICT (sha1) DO
- UPDATE SET date=LEAST(EXCLUDED.date,content.date)
- """
- page_size = self.page_size or len(cnts)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cursor, sql, argslist=cnts.items(), page_size=page_size
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if cnts:
+ sql = """
+ INSERT INTO content(sha1, date) VALUES %s
+ ON CONFLICT (sha1) DO
+ UPDATE SET date=LEAST(EXCLUDED.date,content.date)
+ """
+ page_size = self.page_size or len(cnts)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cursor, sql, argslist=cnts.items(), page_size=page_size
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
@@ -139,29 +149,23 @@
return dates
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
+ @handle_raise_on_commit
def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
data = [(sha1, rev.date, rev.flat) for sha1, rev in dirs.items()]
- try:
- if data:
- sql = """
- INSERT INTO directory(sha1, date, flat) VALUES %s
- ON CONFLICT (sha1) DO
- UPDATE SET
- date=LEAST(EXCLUDED.date, directory.date),
- flat=(EXCLUDED.flat OR directory.flat)
- """
- page_size = self.page_size or len(data)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cur=cursor, sql=sql, argslist=data, page_size=page_size
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if data:
+ sql = """
+ INSERT INTO directory(sha1, date, flat) VALUES %s
+ ON CONFLICT (sha1) DO
+ UPDATE SET
+ date=LEAST(EXCLUDED.date, directory.date),
+ flat=(EXCLUDED.flat OR directory.flat)
+ """
+ page_size = self.page_size or len(data)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cur=cursor, sql=sql, argslist=data, page_size=page_size
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]:
@@ -191,10 +195,9 @@
return {row["sha1"] for row in cursor}
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
+ @handle_raise_on_commit
def location_add(self, paths: Iterable[bytes]) -> bool:
- if not self.with_path():
- return True
- try:
+ if self.with_path():
values = [(path,) for path in paths]
if values:
sql = """
@@ -206,13 +209,7 @@
psycopg2.extras.execute_values(
cursor, sql, argslist=values, page_size=page_size
)
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
def location_get_all(self) -> Set[bytes]:
@@ -221,28 +218,22 @@
return {row["path"] for row in cursor}
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
+ @handle_raise_on_commit
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
- try:
- if orgs:
- sql = """
- INSERT INTO origin(sha1, url) VALUES %s
- ON CONFLICT DO NOTHING
- """
- page_size = self.page_size or len(orgs)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cur=cursor,
- sql=sql,
- argslist=orgs.items(),
- page_size=page_size,
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if orgs:
+ sql = """
+ INSERT INTO origin(sha1, url) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ page_size = self.page_size or len(orgs)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cur=cursor,
+ sql=sql,
+ argslist=orgs.items(),
+ page_size=page_size,
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"})
def open(self) -> None:
@@ -269,6 +260,7 @@
return urls
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
+ @handle_raise_on_commit
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
@@ -276,30 +268,23 @@
data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()]
else:
data = [(sha1, None, None) for sha1 in revs]
- try:
- if data:
- sql = """
- INSERT INTO revision(sha1, date, origin)
- (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin
- FROM (VALUES %s) AS V(rev, date, org)
- LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git))
- ON CONFLICT (sha1) DO
- UPDATE SET
- date=LEAST(EXCLUDED.date, revision.date),
- origin=COALESCE(EXCLUDED.origin, revision.origin)
- """
- page_size = self.page_size or len(data)
- with self.transaction() as cursor:
- psycopg2.extras.execute_values(
- cur=cursor, sql=sql, argslist=data, page_size=page_size
- )
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if data:
+ sql = """
+ INSERT INTO revision(sha1, date, origin)
+ (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin
+ FROM (VALUES %s) AS V(rev, date, org)
+ LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git))
+ ON CONFLICT (sha1) DO
+ UPDATE SET
+ date=LEAST(EXCLUDED.date, revision.date),
+ origin=COALESCE(EXCLUDED.origin, revision.origin)
+ """
+ page_size = self.page_size or len(data)
+ with self.transaction() as cursor:
+ psycopg2.extras.execute_values(
+ cur=cursor, sql=sql, argslist=data, page_size=page_size
+ )
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
@@ -324,34 +309,28 @@
return result
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
+ @handle_raise_on_commit
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
rows = [(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts]
- try:
- if rows:
- rel_table = relation.value
- src_table, *_, dst_table = rel_table.split("_")
- page_size = self.page_size or len(rows)
- # Put the next three queries in a manual single transaction:
- # they use the same temp table
- with self.transaction() as cursor:
- cursor.execute("SELECT swh_mktemp_relation_add()")
- psycopg2.extras.execute_values(
- cur=cursor,
- sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s",
- argslist=rows,
- page_size=page_size,
- )
- sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)"
- cursor.execute(query=sql, vars=(rel_table, src_table, dst_table))
- return True
- except: # noqa: E722
- # Unexpected error occurred, rollback all changes and log message
- LOGGER.exception("Unexpected error")
- if self.raise_on_commit:
- raise
- return False
+ if rows:
+ rel_table = relation.value
+ src_table, *_, dst_table = rel_table.split("_")
+ page_size = self.page_size or len(rows)
+ # Put the next three queries in a manual single transaction:
+ # they use the same temp table
+ with self.transaction() as cursor:
+ cursor.execute("SELECT swh_mktemp_relation_add()")
+ psycopg2.extras.execute_values(
+ cur=cursor,
+ sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s",
+ argslist=rows,
+ page_size=page_size,
+ )
+ sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)"
+ cursor.execute(query=sql, vars=(rel_table, src_table, dst_table))
+ return True
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
def relation_get(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 9:07 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223551
Attached To
D6734: Refactor `raise_on_commit` logic with a decorator
Event Timeline
Log In to Comment