diff --git a/sql/swh-func.sql b/sql/swh-func.sql index c463fc9c..1ff02edd 100644 --- a/sql/swh-func.sql +++ b/sql/swh-func.sql @@ -1,348 +1,383 @@ -- create a temporary table called tmp_TBLNAME, mimicking existing table -- TBLNAME -- -- Args: -- tblname: name of the table to mimick create or replace function swh_mktemp(tblname regclass) returns void language plpgsql as $$ begin execute format(' create temporary table tmp_%I (like %I including defaults) on commit drop ', tblname, tblname); return; end $$; -- create a temporary table for directory entries called tmp_TBLNAME, -- mimicking existing table TBLNAME with an extra dir_id (sha1_git) -- column, and dropping the id column. -- -- This is used to create the tmp_directory_entry_ tables. -- -- Args: -- tblname: name of the table to mimick create or replace function swh_mktemp_dir_entry(tblname regclass) returns void language plpgsql as $$ begin execute format(' create temporary table tmp_%I (like %I including defaults, dir_id sha1_git) on commit drop; alter table tmp_%I drop column id; ', tblname, tblname, tblname, tblname); return; end $$; -- create a temporary table for revisions called tmp_revisions, -- mimicking existing table revision, replacing the foreign keys to -- people with an email and name field -- create or replace function swh_mktemp_revision() returns void language plpgsql as $$ begin create temporary table tmp_revision ( like revision including defaults, author_name text not null default '', author_email text not null default '', committer_name text not null default '', committer_email text not null default '' ) on commit drop; alter table tmp_revision drop column author; alter table tmp_revision drop column committer; return; end $$; -- create a temporary table for releases called tmp_release, -- mimicking existing table release, replacing the foreign keys to -- people with an email and name field -- create or replace function swh_mktemp_release() returns void language plpgsql as $$ begin create temporary table tmp_release ( like release including defaults, author_name text not null default '', author_email text not null default '' ) on commit drop; alter table tmp_release drop column author; return; end $$; -- a content signature is a set of cryptographic checksums that we use to -- uniquely identify content, for the purpose of verifying if we already have -- some content or not during content injection create type content_signature as ( sha1 sha1, sha1_git sha1_git, sha256 sha256 ); -- check which entries of tmp_content are missing from content -- -- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content, -- 2. call this function create or replace function swh_content_missing() returns setof content_signature language plpgsql as $$ begin return query select sha1, sha1_git, sha256 from tmp_content except select sha1, sha1_git, sha256 from content; return; end $$; -- add tmp_content entries to content, skipping duplicates -- -- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content, -- 2. call this function create or replace function swh_content_add() returns void language plpgsql as $$ declare rows bigint; begin insert into content (sha1, sha1_git, sha256, length, status) select distinct sha1, sha1_git, sha256, length, status from tmp_content where (sha1, sha1_git, sha256) in (select * from swh_content_missing()); -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid -- the extra swh_content_missing() query here. return; end $$; -- check which entries of tmp_directory are missing from directory -- -- operates in bulk: 0. swh_mktemp(directory), 1. COPY to tmp_directory, -- 2. call this function create or replace function swh_directory_missing() returns setof directory language plpgsql as $$ begin return query select id from tmp_directory except select id from directory; return; end $$; -- Add tmp_directory_entry_dir entries to directory_entry_dir and -- directory_list_dir, skipping duplicates in directory_entry_dir. -- -- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_dir'), 1 COPY to -- tmp_directory_entry_dir, 2. call this function create or replace function swh_directory_entry_dir_add() returns void language plpgsql as $$ begin insert into directory_entry_dir (target, name, perms, atime, mtime, ctime) select distinct t.target, t.name, t.perms, t.atime, t.mtime, t.ctime from tmp_directory_entry_dir t where not exists ( select 1 from directory_entry_dir i where t.target = i.target and t.name = i.name and t.perms = i.perms and t.atime is not distinct from i.atime and t.mtime is not distinct from i.mtime and t.ctime is not distinct from i.ctime); insert into directory_list_dir (entry_id, dir_id) select i.id, t.dir_id from tmp_directory_entry_dir t inner join directory_entry_dir i on t.target = i.target and t.name = i.name and t.perms = i.perms and t.atime is not distinct from i.atime and t.mtime is not distinct from i.mtime and t.ctime is not distinct from i.ctime; return; end $$; -- Add tmp_directory_entry_file entries to directory_entry_file and -- directory_list_file, skipping duplicates in directory_entry_file. -- -- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_file'), 1 COPY to -- tmp_directory_entry_file, 2. call this function create or replace function swh_directory_entry_file_add() returns void language plpgsql as $$ begin insert into directory_entry_file (target, name, perms, atime, mtime, ctime) select distinct t.target, t.name, t.perms, t.atime, t.mtime, t.ctime from tmp_directory_entry_file t where not exists ( select 1 from directory_entry_file i where t.target = i.target and t.name = i.name and t.perms = i.perms and t.atime is not distinct from i.atime and t.mtime is not distinct from i.mtime and t.ctime is not distinct from i.ctime); insert into directory_list_file (entry_id, dir_id) select i.id, t.dir_id from tmp_directory_entry_file t inner join directory_entry_file i on t.target = i.target and t.name = i.name and t.perms = i.perms and t.atime is not distinct from i.atime and t.mtime is not distinct from i.mtime and t.ctime is not distinct from i.ctime; return; end $$; -- Add tmp_directory_entry_rev entries to directory_entry_rev and -- directory_list_rev, skipping duplicates in directory_entry_rev. -- -- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_rev'), 1 COPY to -- tmp_directory_entry_rev, 2. call this function create or replace function swh_directory_entry_rev_add() returns void language plpgsql as $$ begin insert into directory_entry_rev (target, name, perms, atime, mtime, ctime) select distinct t.target, t.name, t.perms, t.atime, t.mtime, t.ctime from tmp_directory_entry_rev t where not exists ( select 1 from directory_entry_rev i where t.target = i.target and t.name = i.name and t.perms = i.perms and t.atime is not distinct from i.atime and t.mtime is not distinct from i.mtime and t.ctime is not distinct from i.ctime); insert into directory_list_rev (entry_id, dir_id) select i.id, t.dir_id from tmp_directory_entry_rev t inner join directory_entry_rev i on t.target = i.target and t.name = i.name and t.perms = i.perms and t.atime is not distinct from i.atime and t.mtime is not distinct from i.mtime and t.ctime is not distinct from i.ctime; return; end $$; -- a directory listing entry with all the metadata -- -- can be used to list a directory, and retrieve all the data in one go. create type directory_entry as ( dir_id sha1_git, -- id of the parent directory type text, -- type of entry (one of 'dir', 'file', 'rev') target sha1_git, -- id of target name unix_path, -- path name, relative to containing dir perms file_perms, -- unix-like permissions atime timestamptz, -- time of last access mtime timestamptz, -- time of last modification ctime timestamptz -- time of last status change ); -- List a single level of directory walked_dir_id create or replace function swh_directory_walk_one(walked_dir_id sha1_git) returns setof directory_entry language plpgsql as $$ begin return query ( select dir_id, 'dir' as type, target, name, perms, atime, mtime, ctime from directory_list_dir l left join directory_entry_dir d on l.entry_id = d.id where l.dir_id = walked_dir_id union select dir_id, 'file' as type, target, name, perms, atime, mtime, ctime from directory_list_file l left join directory_entry_file d on l.entry_id = d.id where l.dir_id = walked_dir_id union select dir_id, 'rev' as type, target, name, perms, atime, mtime, ctime from directory_list_rev l left join directory_entry_rev d on l.entry_id = d.id where l.dir_id = walked_dir_id ) order by name; return; end $$; -- List missing revisions from tmp_revision create or replace function swh_revision_missing() returns setof sha1_git language plpgsql as $$ begin return query select id from tmp_revision except select id from revision; return; end $$; -- Create entries in person from tmp_revision create or replace function swh_person_add_from_revision() returns void language plpgsql as $$ begin with t as ( select author_name as name, author_email as email from tmp_revision union select committer_name as name, committer_email as email from tmp_revision ) insert into person (name, email) select distinct name, email from t where not exists ( select 1 from person p where t.name = p.name and t.email = p.email ); return; end $$; -- Create entries in revision from tmp_revision create or replace function swh_revision_add() returns void language plpgsql as $$ begin perform swh_person_add_from_revision(); insert into revision (id, date, date_offset, committer_date, committer_date_offset, type, directory, message, author, committer) select t.id, t.date, t.date_offset, t.committer_date, t.committer_date_offset, t.type, t.directory, t.message, a.id, c.id from tmp_revision t left join person a on a.name = t.author_name and a.email = t.author_email left join person c on c.name = t.committer_name and c.email = t.committer_email; return; end $$; + +-- Create entries in person from tmp_release +create or replace function swh_person_add_from_release() + returns void + language plpgsql +as $$ +begin + with t as ( + select distinct author_name as name, author_email as email from tmp_release + ) insert into person (name, email) + select name, email from t + where not exists ( + select 1 + from person p + where t.name = p.name and t.email = p.email + ); + return; +end +$$; + +-- Create entries in release from tmp_release +create or replace function swh_release_add() + returns void + language plpgsql +as $$ +begin + perform swh_person_add_from_release(); + + insert into release (id, revision, date, date_offset, name, comment, author) + select t.id, t.revision, t.date, t.date_offset, t.name, t.comment, a.id + from tmp_release t + left join person a on a.name = t.author_name and a.email = t.author_email; + return; +end +$$; diff --git a/swh/storage/db.py b/swh/storage/db.py index 1a9c0177..ca147a5e 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,172 +1,175 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import functools import psycopg2 import tempfile from contextlib import contextmanager TMP_CONTENT_TABLE = 'tmp_content' def stored_procedure(stored_proc): """decorator to execute remote stored procedure, specified as argument Generally, the body of the decorated function should be empty. If it is not, the stored procedure will be executed first; the function body then. """ def wrap(meth): @functools.wraps(meth) def _meth(self, *args, **kwargs): cur = kwargs.get('cur', None) self._cursor(cur).execute('SELECT %s()' % stored_proc) meth(self, *args, **kwargs) return _meth return wrap def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() return entry def line_to_bytes(line): """Convert a line coming from the database to bytes""" return line.__class__(entry_to_bytes(entry) for entry in line) def cursor_to_bytes(cursor): """Yield all the data from a cursor as bytes""" yield from (line_to_bytes(line) for line in cursor) class Db: """Proxy to the SWH DB, with wrappers around stored procedures """ @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) return cls(conn) def _cursor(self, cur_arg): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg # elif self.cur is not None: # return self.cur else: return self.conn.cursor() def __init__(self, conn): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB """ self.conn = conn @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except: if not self.conn.closed: self.conn.rollback() raise def mktemp(self, tblname, cur=None): self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)', (('directory_entry_%s' % entry_type),)) @stored_procedure('swh_mktemp_revision') def mktemp_revision(self, cur=None): pass @stored_procedure('swh_mktemp_release') def mktemp_release(self, cur=None): pass def copy_to(self, items, tblname, columns, cur=None, item_cb=None): def escape(data): if data is None: return '\\N' if isinstance(data, bytes): return '\\\\x%s' % binascii.hexlify(data).decode('ascii') else: return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = '\t'.join([escape(d.get(k)) for k in columns]) + '\n' f.write(line) f.seek(0) self._cursor(cur).copy_from(f, tblname, columns=columns) @stored_procedure('swh_content_add') def content_add_from_temp(self, cur=None): pass @stored_procedure('swh_revision_add') def revision_add_from_temp(self, cur=None): pass + @stored_procedure('swh_release_add') + def release_add_from_temp(self, cur=None): pass + def content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_content_missing()""") yield from cursor_to_bytes(cur) def directory_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("SELECT id FROM swh_directory_missing()") yield from cursor_to_bytes(cur) def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cur.execute('select * from swh_directory_walk_one(%s)', (directory,)) yield from cursor_to_bytes(cur) def revision_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("SELECT id FROM swh_revision_missing() as r(id)") yield from cursor_to_bytes(cur)