diff --git a/sql/swh-func.sql b/sql/swh-func.sql --- a/sql/swh-func.sql +++ b/sql/swh-func.sql @@ -92,16 +92,6 @@ alter table tmp_release drop column object_id; $$; --- create a temporary table with a single "bytea" column for fast object lookup. -create or replace function swh_mktemp_bytea() - returns void - language sql -as $$ - create temporary table tmp_bytea ( - id bytea - ) on commit drop; -$$; - -- create a temporary table for occurrence_history create or replace function swh_mktemp_occurrence_history() returns void @@ -173,46 +163,6 @@ ); --- check which entries of tmp_content are missing from content --- --- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content, --- 2. call this function -create or replace function swh_content_missing() - returns setof content_signature - language plpgsql -as $$ -begin - return query ( - select sha1, sha1_git, sha256, blake2s256 from tmp_content as tmp - where not exists ( - select 1 from content as c - where c.sha1 = tmp.sha1 and - c.sha1_git = tmp.sha1_git and - c.sha256 = tmp.sha256 - ) - ); - return; -end -$$; - --- check which entries of tmp_content_sha1 are missing from content --- --- operates in bulk: 0. swh_mktemp_content_sha1(), 1. COPY to tmp_content_sha1, --- 2. call this function -create or replace function swh_content_missing_per_sha1() - returns setof sha1 - language plpgsql -as $$ -begin - return query - (select id::sha1 - from tmp_bytea as tmp - where not exists - (select 1 from content as c where c.sha1=tmp.id)); -end -$$; - - -- check which entries of tmp_skipped_content are missing from skipped_content -- -- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content, @@ -292,15 +242,7 @@ as $$ begin insert into content (sha1, sha1_git, sha256, blake2s256, length, status) - select distinct sha1, sha1_git, sha256, blake2s256, length, status - from tmp_content - where (sha1, sha1_git, sha256) in ( - select sha1, sha1_git, sha256 - from swh_content_missing() - ); - -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. - -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid - -- the extra swh_content_missing() query here. + select distinct sha1, sha1_git, sha256, blake2s256, length, status from tmp_content; return; end $$; @@ -689,42 +631,6 @@ $$; --- Retrieve revisions from tmp_bytea in bulk -create or replace function swh_revision_get() - returns setof revision_entry - language plpgsql -as $$ -begin - return query - select r.id, r.date, r.date_offset, r.date_neg_utc_offset, - r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset, - r.type, r.directory, r.message, - a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic, - array(select rh.parent_id::bytea from revision_history rh where rh.id = t.id order by rh.parent_rank) - as parents, r.object_id - from tmp_bytea t - left join revision r on t.id = r.id - left join person a on a.id = r.author - left join person c on c.id = r.committer; - return; -end -$$; - --- List missing revisions from tmp_bytea -create or replace function swh_revision_missing() - returns setof sha1_git - language plpgsql -as $$ -begin - return query - select id::sha1_git from tmp_bytea t - where not exists ( - select 1 from revision r - where r.id = t.id); - return; -end -$$; - -- Detailed entry for a release create type release_entry as ( @@ -744,22 +650,6 @@ object_id bigint ); --- Detailed entry for release -create or replace function swh_release_get() - returns setof release_entry - language plpgsql -as $$ -begin - return query - select r.id, r.target, r.target_type, r.date, r.date_offset, r.date_neg_utc_offset, r.name, r.comment, - r.synthetic, p.id as author_id, p.fullname as author_fullname, p.name as author_name, p.email as author_email, r.object_id - from tmp_bytea t - inner join release r on t.id = r.id - inner join person p on p.id = r.author; - return; -end -$$; - -- Create entries in person from tmp_revision create or replace function swh_person_add_from_revision() returns void @@ -800,21 +690,6 @@ $$; --- List missing releases from tmp_bytea -create or replace function swh_release_missing() - returns setof sha1_git - language plpgsql -as $$ -begin - return query - select id::sha1_git from tmp_bytea t - where not exists ( - select 1 from release r - where r.id = t.id); -end -$$; - - -- Create entries in person from tmp_release create or replace function swh_person_add_from_release() returns void @@ -1189,39 +1064,6 @@ where occ.origin = origin_id and occ.target_type = 'revision' and r.target_type = 'revision'; $$; - -create type object_found as ( - sha1_git sha1_git, - type object_type, - id bytea, -- sha1 or sha1_git depending on object_type - object_id bigint -); - --- Find objects by sha1_git, return their type and their main identifier -create or replace function swh_object_find_by_sha1_git() - returns setof object_found - language plpgsql -as $$ -begin - return query - with known_objects as (( - select id as sha1_git, 'release'::object_type as type, id, object_id from release r - where exists (select 1 from tmp_bytea t where t.id = r.id) - ) union all ( - select id as sha1_git, 'revision'::object_type as type, id, object_id from revision r - where exists (select 1 from tmp_bytea t where t.id = r.id) - ) union all ( - select id as sha1_git, 'directory'::object_type as type, id, object_id from directory d - where exists (select 1 from tmp_bytea t where t.id = d.id) - ) union all ( - select sha1_git as sha1_git, 'content'::object_type as type, sha1 as id, object_id from content c - where exists (select 1 from tmp_bytea t where t.id = c.sha1_git) - )) - select t.id::sha1_git as sha1_git, k.type, k.id, k.object_id from tmp_bytea t - left join known_objects k on t.id = k.sha1_git; -end -$$; - -- Create entries in entity_history from tmp_entity_history -- -- TODO: do something smarter to compress the entries if the data diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -14,7 +14,7 @@ ); insert into dbversion(version, release, description) - values(118, now(), 'Work In Progress'); + values(119, now(), 'Work In Progress'); -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); diff --git a/sql/upgrades/119.sql b/sql/upgrades/119.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/119.sql @@ -0,0 +1,36 @@ +-- SWH DB schema upgrade +-- from_version: 118 +-- to_version: 119 +-- description: Drop unused functions using temporary tables + +insert into dbversion(version, release, description) + values(119, now(), 'Work In Progress'); + +CREATE OR REPLACE FUNCTION swh_content_add() RETURNS void + LANGUAGE plpgsql + AS $$ +begin + insert into content (sha1, sha1_git, sha256, blake2s256, length, status) + select distinct sha1, sha1_git, sha256, blake2s256, length, status from tmp_content; + return; +end +$$; + +DROP FUNCTION swh_content_missing_per_sha1(); + +DROP FUNCTION swh_object_find_by_sha1_git(); + +DROP FUNCTION swh_content_missing(); + +DROP FUNCTION swh_release_get(); + +DROP FUNCTION swh_release_missing(); + +DROP FUNCTION swh_revision_get(); + +DROP FUNCTION swh_revision_missing(); + +DROP FUNCTION swh_mktemp_bytea(); + +DROP TYPE object_found; + diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -17,6 +17,7 @@ import psycopg2 import psycopg2.extras +from .db_utils import execute_values_generator TMP_CONTENT_TABLE = 'tmp_content' @@ -72,6 +73,11 @@ yield from (line_to_bytes(line) for line in cursor) +def execute_values_to_bytes(*args, **kwargs): + for line in execute_values_generator(*args, **kwargs): + yield line_to_bytes(line) + + class BaseDb: """Base class for swh.storage.*Db. @@ -241,9 +247,6 @@ @stored_procedure('swh_mktemp_entity_history') def mktemp_entity_history(self, cur=None): pass - @stored_procedure('swh_mktemp_bytea') - def mktemp_bytea(self, cur=None): pass - def register_listener(self, notify_queue, cur=None): """Register a listener for NOTIFY queue `notify_queue`""" self._cursor(cur).execute("LISTEN %s" % notify_queue) @@ -278,14 +281,6 @@ @stored_procedure('swh_entity_history_add') def entity_history_add_from_temp(self, cur=None): pass - def store_tmp_bytea(self, ids, cur=None): - """Store the given identifiers in a new tmp_bytea table""" - cur = self._cursor(cur) - - self.mktemp_bytea(cur) - self.copy_to(({'id': elem} for elem in ids), 'tmp_bytea', - ['id'], cur) - def content_update_from_temp(self, keys_to_update, cur=None): cur = self._cursor(cur) cur.execute("""select swh_content_update(ARRAY[%s] :: text[])""" % @@ -301,7 +296,7 @@ def content_get_metadata_from_sha1s(self, sha1s, cur=None): cur = self._cursor(cur) - psycopg2.extras.execute_values( + yield from execute_values_to_bytes( cur, """ select t.sha1, %s from (values %%s) as t (sha1) left join content using (sha1) @@ -309,23 +304,37 @@ ((sha1,) for sha1 in sha1s), ) - yield from cursor_to_bytes(cur) + content_hash_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] - def content_missing_from_temp(self, cur=None): + def content_missing_from_list(self, contents, cur=None): cur = self._cursor(cur) - cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 - FROM swh_content_missing()""") + keys = ', '.join(self.content_hash_keys) + equality = ' AND '.join( + ('t.%s = c.%s' % (key, key)) + for key in self.content_hash_keys + ) - yield from cursor_to_bytes(cur) + yield from execute_values_to_bytes( + cur, """ + SELECT %s + FROM (VALUES %%s) as t(%s) + WHERE NOT EXISTS ( + SELECT 1 FROM content c + WHERE %s + ) + """ % (keys, keys, equality), + (tuple(c[key] for key in self.content_hash_keys) for c in contents) + ) - def content_missing_per_sha1_from_temp(self, cur=None): + def content_missing_per_sha1(self, sha1s, cur=None): cur = self._cursor(cur) - cur.execute("""SELECT * - FROM swh_content_missing_per_sha1()""") - - yield from cursor_to_bytes(cur) + yield from execute_values_to_bytes(cur, """ + SELECT t.sha1 FROM (VALUES %s) AS t(sha1) + WHERE NOT EXISTS ( + SELECT 1 FROM content c WHERE c.sha1 = t.sha1 + )""", ((sha1,) for sha1 in sha1s)) def skipped_content_missing_from_temp(self, cur=None): cur = self._cursor(cur) @@ -420,10 +429,15 @@ else: return content - def directory_missing_from_temp(self, cur=None): + def directory_missing_from_list(self, directories, cur=None): cur = self._cursor(cur) - cur.execute('SELECT * FROM swh_directory_missing()') - yield from cursor_to_bytes(cur) + yield from execute_values_to_bytes( + cur, """ + SELECT id FROM (VALUES %s) as t(id) + WHERE NOT EXISTS ( + SELECT 1 FROM directory d WHERE d.id = t.id + ) + """, ((id,) for id in directories)) directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256', 'length'] @@ -458,12 +472,16 @@ return None return line_to_bytes(data) - def revision_missing_from_temp(self, cur=None): + def revision_missing_from_list(self, revisions, cur=None): cur = self._cursor(cur) - cur.execute('SELECT id FROM swh_revision_missing() as r(id)') - - yield from cursor_to_bytes(cur) + yield from execute_values_to_bytes( + cur, """ + SELECT id FROM (VALUES %s) as t(id) + WHERE NOT EXISTS ( + SELECT 1 FROM revision r WHERE r.id = t.id + ) + """, ((id,) for id in revisions)) revision_add_cols = [ 'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', @@ -625,12 +643,44 @@ cur.execute(query, (origin_id, visit_id)) yield from cursor_to_bytes(cur) - def revision_get_from_temp(self, cur=None): + @staticmethod + def mangle_query_key(key, main_table): + if key == 'id': + return 't.id' + if key == 'parents': + return ''' + ARRAY( + SELECT rh.parent_id::bytea + FROM revision_history rh + WHERE rh.id = t.id + ORDER BY rh.parent_rank + )''' + if '_' not in key: + return '%s.%s' % (main_table, key) + + head, tail = key.split('_', 1) + if (head in ('author', 'committer') + and tail in ('name', 'email', 'id', 'fullname')): + return '%s.%s' % (head, tail) + + return '%s.%s' % (main_table, key) + + def revision_get_from_list(self, revisions, cur=None): cur = self._cursor(cur) - query = 'SELECT %s FROM swh_revision_get()' % ( - ', '.join(self.revision_get_cols)) - cur.execute(query) - yield from cursor_to_bytes(cur) + + query_keys = ', '.join( + self.mangle_query_key(k, 'revision') + for k in self.revision_get_cols + ) + + yield from execute_values_to_bytes( + cur, """ + SELECT %s FROM (VALUES %%s) as t(id) + LEFT JOIN revision ON t.id = revision.id + LEFT JOIN person author ON revision.author = author.id + LEFT JOIN person committer ON revision.committer = committer.id + """ % query_keys, + ((id,) for id in revisions)) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) @@ -654,23 +704,63 @@ cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) - def release_missing_from_temp(self, cur=None): + def release_missing_from_list(self, releases, cur=None): cur = self._cursor(cur) - cur.execute('SELECT id FROM swh_release_missing() as r(id)') - yield from cursor_to_bytes(cur) + yield from execute_values_to_bytes( + cur, """ + SELECT id FROM (VALUES %s) as t(id) + WHERE NOT EXISTS ( + SELECT 1 FROM release r WHERE r.id = t.id + ) + """, ((id,) for id in releases)) object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) - self.store_tmp_bytea(ids, cur) - query = 'select %s from swh_object_find_by_sha1_git()' % ( - ', '.join(self.object_find_by_sha1_git_cols) + yield from execute_values_to_bytes( + cur, """ + WITH t (id) AS (VALUES %s), + known_objects as (( + select + id as sha1_git, + 'release'::object_type as type, + id, + object_id + from release r + where exists (select 1 from t where t.id = r.id) + ) union all ( + select + id as sha1_git, + 'revision'::object_type as type, + id, + object_id + from revision r + where exists (select 1 from t where t.id = r.id) + ) union all ( + select + id as sha1_git, + 'directory'::object_type as type, + id, + object_id + from directory d + where exists (select 1 from t where t.id = d.id) + ) union all ( + select + sha1_git as sha1_git, + 'content'::object_type as type, + sha1 as id, + object_id + from content c + where exists (select 1 from t where t.id = c.sha1_git) + )) + select t.id as sha1_git, k.type, k.id, k.object_id + from t + left join known_objects k on t.id = k.sha1_git + """, + ((id,) for id in ids) ) - cur.execute(query) - - yield from cursor_to_bytes(cur) def stat_counters(self, cur=None): cur = self._cursor(cur) @@ -846,14 +936,20 @@ ] release_get_cols = release_add_cols + ['author_id'] - def release_get_from_temp(self, cur=None): + def release_get_from_list(self, releases, cur=None): cur = self._cursor(cur) - query = ''' - SELECT %s - FROM swh_release_get() - ''' % ', '.join(self.release_get_cols) - cur.execute(query) - yield from cursor_to_bytes(cur) + query_keys = ', '.join( + self.mangle_query_key(k, 'release') + for k in self.release_get_cols + ) + + yield from execute_values_to_bytes( + cur, """ + SELECT %s FROM (VALUES %%s) as t(id) + LEFT JOIN release ON t.id = release.id + LEFT JOIN person author ON release.author = author.id + """ % query_keys, + ((id,) for id in releases)) def release_get_by(self, origin_id, diff --git a/swh/storage/db_utils.py b/swh/storage/db_utils.py new file mode 100644 --- /dev/null +++ b/swh/storage/db_utils.py @@ -0,0 +1,118 @@ +# Copyright (C) 2015-2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# +# This code has been imported from psycopg2, version 2.7.4, +# https://github.com/psycopg/psycopg2/tree/5afb2ce803debea9533e293eef73c92ffce95bcd +# and modified by Software Heritage. +# +# Original file: lib/extras.py +# +# psycopg2 is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. + + +import re + +import psycopg2.extensions + + +def _paginate(seq, page_size): + """Consume an iterable and return it in chunks. + Every chunk is at most `page_size`. Never return an empty chunk. + """ + page = [] + it = iter(seq) + while 1: + try: + for i in range(page_size): + page.append(next(it)) + yield page + page = [] + except StopIteration: + if page: + yield page + return + + +def _split_sql(sql): + """Split *sql* on a single ``%s`` placeholder. + Split on the %s, perform %% replacement and return pre, post lists of + snippets. + """ + curr = pre = [] + post = [] + tokens = re.split(br'(%.)', sql) + for token in tokens: + if len(token) != 2 or token[:1] != b'%': + curr.append(token) + continue + + if token[1:] == b's': + if curr is pre: + curr = post + else: + raise ValueError( + "the query contains more than one '%s' placeholder") + elif token[1:] == b'%': + curr.append(b'%') + else: + raise ValueError("unsupported format character: '%s'" + % token[1:].decode('ascii', 'replace')) + + if curr is pre: + raise ValueError("the query doesn't contain any '%s' placeholder") + + return pre, post + + +def execute_values_generator(cur, sql, argslist, template=None, page_size=100): + '''Execute a statement using :sql:`VALUES` with a sequence of parameters. + Rows returned by the query are returned through a generator. + You need to consume the generator for the queries to be executed! + :param cur: the cursor to use to execute the query. + :param sql: the query to execute. It must contain a single ``%s`` + placeholder, which will be replaced by a `VALUES list`__. + Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``. + :param argslist: sequence of sequences or dictionaries with the arguments + to send to the query. The type and content must be consistent with + *template*. + :param template: the snippet to merge to every item in *argslist* to + compose the query. + - If the *argslist* items are sequences it should contain positional + placeholders (e.g. ``"(%s, %s, %s)"``, or ``"(%s, %s, 42)``" if there + are constants value...). + - If the *argslist* items are mappings it should contain named + placeholders (e.g. ``"(%(id)s, %(f1)s, 42)"``). + If not specified, assume the arguments are sequence and use a simple + positional template (i.e. ``(%s, %s, ...)``), with the number of + placeholders sniffed by the first element in *argslist*. + :param page_size: maximum number of *argslist* items to include in every + statement. If there are more items the function will execute more than + one statement. + :param yield_from_cur: Whether to yield results from the cursor in this + function directly. + .. __: https://www.postgresql.org/docs/current/static/queries-values.html + After the execution of the function the `cursor.rowcount` property will + **not** contain a total result. + ''' + # we can't just use sql % vals because vals is bytes: if sql is bytes + # there will be some decoding error because of stupid codec used, and Py3 + # doesn't implement % on bytes. + if not isinstance(sql, bytes): + sql = sql.encode( + psycopg2.extensions.encodings[cur.connection.encoding] + ) + pre, post = _split_sql(sql) + + for page in _paginate(argslist, page_size=page_size): + if template is None: + template = b'(' + b','.join([b'%s'] * len(page[0])) + b')' + parts = pre[:] + for args in page: + parts.append(cur.mogrify(template, args)) + parts.append(b',') + parts[-1:] = post + cur.execute(b''.join(parts)) + yield from cur diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -28,9 +28,6 @@ BULK_BLOCK_CONTENT_LEN_MAX = 10000 -CONTENT_HASH_KEYS = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] - - class Storage(): """SWH storage proxy, encompassing DB and object storage @@ -109,7 +106,7 @@ """ db = self.get_db() - def _unique_key(hash, keys=CONTENT_HASH_KEYS): + def _unique_key(hash, keys=db.content_hash_keys): """Given a hash (tuple or dict), return a unique key from the aggregation of keys. @@ -246,8 +243,8 @@ Returns: an iterable with content metadata corresponding to the given ids """ - for content_metadata in db.content_get_metadata_from_sha1s(content, cur): - yield dict(zip(db.content_get_metadata_keys, content_metadata)) + for metadata in db.content_get_metadata_from_sha1s(content, cur): + yield dict(zip(db.content_get_metadata_keys, metadata)) @db_transaction_generator() def content_missing(self, content, key_hash='sha1', db=None, cur=None): @@ -272,19 +269,17 @@ TODO: an exception when we get a hash collision. """ - keys = CONTENT_HASH_KEYS + keys = db.content_hash_keys - if key_hash not in CONTENT_HASH_KEYS: + if key_hash not in keys: raise ValueError("key_hash should be one of %s" % keys) key_hash_idx = keys.index(key_hash) - # Create temporary table for metadata injection - db.mktemp('content', cur) - - db.copy_to(content, 'tmp_content', keys + ['length'], cur) + if not content: + return - for obj in db.content_missing_from_temp(cur): + for obj in db.content_missing_from_list(content, cur): yield obj[key_hash_idx] @db_transaction_generator() @@ -301,8 +296,7 @@ TODO: an exception when we get a hash collision. """ - db.store_tmp_bytea(contents, cur) - for obj in db.content_missing_per_sha1_from_temp(cur): + for obj in db.content_missing_per_sha1(contents, cur): yield obj[0] @db_transaction_generator() @@ -317,7 +311,7 @@ iterable: missing signatures """ - keys = CONTENT_HASH_KEYS + keys = db.content_hash_keys db.mktemp('skipped_content', cur) db.copy_to(content, 'tmp_skipped_content', @@ -432,14 +426,7 @@ missing directory ids """ - # Create temporary table for metadata injection - db.mktemp('directory', cur) - - directories_dicts = ({'id': dir} for dir in directories) - - db.copy_to(directories_dicts, 'tmp_directory', ['id'], cur) - - for obj in db.directory_missing_from_temp(cur): + for obj in db.directory_missing_from_list(directories, cur): yield obj[0] @db_transaction_generator(statement_timeout=2000) @@ -549,9 +536,10 @@ missing revision ids """ - db.store_tmp_bytea(revisions, cur) + if not revisions: + return - for obj in db.revision_missing_from_temp(cur): + for obj in db.revision_missing_from_list(revisions, cur): yield obj[0] @db_transaction_generator(statement_timeout=500) @@ -566,9 +554,7 @@ revision doesn't exist) """ - db.store_tmp_bytea(revisions, cur) - - for line in db.revision_get_from_temp(cur): + for line in db.revision_get_from_list(revisions, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) @@ -696,10 +682,10 @@ a list of missing release ids """ - # Create temporary table for metadata injection - db.store_tmp_bytea(releases, cur) + if not releases: + return - for obj in db.release_missing_from_temp(cur): + for obj in db.release_missing_from_list(releases, cur): yield obj[0] @db_transaction_generator(statement_timeout=500) @@ -722,10 +708,7 @@ ValueError: if the keys does not match (url and type) nor id. """ - # Create temporary table for metadata injection - db.store_tmp_bytea(releases, cur) - - for release in db.release_get_from_temp(cur): + for release in db.release_get_from_list(releases, cur): yield converters.db_to_release( dict(zip(db.release_get_cols, release)) )