Changeset View
Changeset View
Standalone View
Standalone View
swh/core/db/db_utils.py
Show All 20 Lines | |||||
def stored_procedure(stored_proc): | def stored_procedure(stored_proc): | ||||
"""decorator to execute remote stored procedure, specified as argument | """decorator to execute remote stored procedure, specified as argument | ||||
Generally, the body of the decorated function should be empty. If it is | Generally, the body of the decorated function should be empty. If it is | ||||
not, the stored procedure will be executed first; the function body then. | not, the stored procedure will be executed first; the function body then. | ||||
""" | """ | ||||
def wrap(meth): | def wrap(meth): | ||||
@functools.wraps(meth) | @functools.wraps(meth) | ||||
def _meth(self, *args, **kwargs): | def _meth(self, *args, **kwargs): | ||||
cur = kwargs.get('cur', None) | cur = kwargs.get("cur", None) | ||||
self._cursor(cur).execute('SELECT %s()' % stored_proc) | self._cursor(cur).execute("SELECT %s()" % stored_proc) | ||||
meth(self, *args, **kwargs) | meth(self, *args, **kwargs) | ||||
return _meth | return _meth | ||||
return wrap | return wrap | ||||
def jsonize(value): | def jsonize(value): | ||||
"""Convert a value to a psycopg2 JSON object if necessary""" | """Convert a value to a psycopg2 JSON object if necessary""" | ||||
if isinstance(value, dict): | if isinstance(value, dict): | ||||
return psycopg2.extras.Json(value) | return psycopg2.extras.Json(value) | ||||
Show All 20 Lines | |||||
def _split_sql(sql): | def _split_sql(sql): | ||||
"""Split *sql* on a single ``%s`` placeholder. | """Split *sql* on a single ``%s`` placeholder. | ||||
Split on the %s, perform %% replacement and return pre, post lists of | Split on the %s, perform %% replacement and return pre, post lists of | ||||
snippets. | snippets. | ||||
""" | """ | ||||
curr = pre = [] | curr = pre = [] | ||||
post = [] | post = [] | ||||
tokens = re.split(br'(%.)', sql) | tokens = re.split(br"(%.)", sql) | ||||
for token in tokens: | for token in tokens: | ||||
if len(token) != 2 or token[:1] != b'%': | if len(token) != 2 or token[:1] != b"%": | ||||
curr.append(token) | curr.append(token) | ||||
continue | continue | ||||
if token[1:] == b's': | if token[1:] == b"s": | ||||
if curr is pre: | if curr is pre: | ||||
curr = post | curr = post | ||||
else: | else: | ||||
raise ValueError( | raise ValueError("the query contains more than one '%s' placeholder") | ||||
"the query contains more than one '%s' placeholder") | elif token[1:] == b"%": | ||||
elif token[1:] == b'%': | curr.append(b"%") | ||||
curr.append(b'%') | |||||
else: | else: | ||||
raise ValueError("unsupported format character: '%s'" | raise ValueError( | ||||
% token[1:].decode('ascii', 'replace')) | "unsupported format character: '%s'" | ||||
% token[1:].decode("ascii", "replace") | |||||
) | |||||
if curr is pre: | if curr is pre: | ||||
raise ValueError("the query doesn't contain any '%s' placeholder") | raise ValueError("the query doesn't contain any '%s' placeholder") | ||||
return pre, post | return pre, post | ||||
def execute_values_generator(cur, sql, argslist, template=None, page_size=100): | def execute_values_generator(cur, sql, argslist, template=None, page_size=100): | ||||
'''Execute a statement using SQL ``VALUES`` with a sequence of parameters. | """Execute a statement using SQL ``VALUES`` with a sequence of parameters. | ||||
Rows returned by the query are returned through a generator. | Rows returned by the query are returned through a generator. | ||||
You need to consume the generator for the queries to be executed! | You need to consume the generator for the queries to be executed! | ||||
:param cur: the cursor to use to execute the query. | :param cur: the cursor to use to execute the query. | ||||
:param sql: the query to execute. It must contain a single ``%s`` | :param sql: the query to execute. It must contain a single ``%s`` | ||||
placeholder, which will be replaced by a `VALUES list`__. | placeholder, which will be replaced by a `VALUES list`__. | ||||
Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``. | Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``. | ||||
:param argslist: sequence of sequences or dictionaries with the arguments | :param argslist: sequence of sequences or dictionaries with the arguments | ||||
Show All 16 Lines | :param page_size: maximum number of *argslist* items to include in every | ||||
one statement. | one statement. | ||||
:param yield_from_cur: Whether to yield results from the cursor in this | :param yield_from_cur: Whether to yield results from the cursor in this | ||||
function directly. | function directly. | ||||
.. __: https://www.postgresql.org/docs/current/static/queries-values.html | .. __: https://www.postgresql.org/docs/current/static/queries-values.html | ||||
After the execution of the function the `cursor.rowcount` property will | After the execution of the function the `cursor.rowcount` property will | ||||
**not** contain a total result. | **not** contain a total result. | ||||
''' | """ | ||||
# we can't just use sql % vals because vals is bytes: if sql is bytes | # we can't just use sql % vals because vals is bytes: if sql is bytes | ||||
# there will be some decoding error because of stupid codec used, and Py3 | # there will be some decoding error because of stupid codec used, and Py3 | ||||
# doesn't implement % on bytes. | # doesn't implement % on bytes. | ||||
if not isinstance(sql, bytes): | if not isinstance(sql, bytes): | ||||
sql = sql.encode( | sql = sql.encode(psycopg2.extensions.encodings[cur.connection.encoding]) | ||||
psycopg2.extensions.encodings[cur.connection.encoding] | |||||
) | |||||
pre, post = _split_sql(sql) | pre, post = _split_sql(sql) | ||||
for page in _paginate(argslist, page_size=page_size): | for page in _paginate(argslist, page_size=page_size): | ||||
if template is None: | if template is None: | ||||
template = b'(' + b','.join([b'%s'] * len(page[0])) + b')' | template = b"(" + b",".join([b"%s"] * len(page[0])) + b")" | ||||
parts = pre[:] | parts = pre[:] | ||||
for args in page: | for args in page: | ||||
parts.append(cur.mogrify(template, args)) | parts.append(cur.mogrify(template, args)) | ||||
parts.append(b',') | parts.append(b",") | ||||
parts[-1:] = post | parts[-1:] = post | ||||
cur.execute(b''.join(parts)) | cur.execute(b"".join(parts)) | ||||
yield from cur | yield from cur |