diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -13,7 +13,7 @@ python3-requests, python3-setuptools, python3-swh.core (>= 0.0.28~), - python3-swh.model (>= 0.0.14~), + python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), python3-swh.scheduler (>= 0.0.11~), python3-vcversioner @@ -23,7 +23,7 @@ Package: python3-swh.storage Architecture: all Depends: python3-swh.core (>= 0.0.28~), - python3-swh.model (>= 0.0.14~), + python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} @@ -31,7 +31,7 @@ Package: python3-swh.storage.listener Architecture: all -Depends: python3-swh.journal, +Depends: python3-swh.journal (>= 0.0.2~), python3-kafka (>= 1.3.1~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,5 @@ swh.core >= 0.0.28 -swh.model >= 0.0.14 +swh.model >= 0.0.15 swh.objstorage >= 0.0.17 swh.scheduler >= 0.0.11 +swh.journal >= 0.0.2 diff --git a/sql/swh-func.sql b/sql/swh-func.sql --- a/sql/swh-func.sql +++ b/sql/swh-func.sql @@ -135,9 +135,10 @@ -- uniquely identify content, for the purpose of verifying if we already have -- some content or not during content injection create type content_signature as ( - sha1 sha1, - sha1_git sha1_git, - sha256 sha256 + sha1 sha1, + sha1_git sha1_git, + sha256 sha256, + blake2s256 blake2s256 ); @@ -151,10 +152,12 @@ as $$ begin return query ( - select sha1, sha1_git, sha256 from tmp_content as tmp + select sha1, sha1_git, sha256, blake2s256 from tmp_content as tmp where not exists ( select 1 from content as c - where c.sha1 = tmp.sha1 and c.sha1_git = tmp.sha1_git and c.sha256 = tmp.sha256 + where c.sha1 = tmp.sha1 and + c.sha1_git = tmp.sha1_git and + c.sha256 = tmp.sha256 ) ); return; @@ -189,7 +192,7 @@ as $$ begin return query - select sha1, sha1_git, sha256 from tmp_skipped_content t + select sha1, sha1_git, sha256, blake2s256 from tmp_skipped_content t where not exists (select 1 from skipped_content s where s.sha1 is not distinct from t.sha1 and @@ -210,9 +213,10 @@ -- (e.g., for the web app), for batch lookup of missing content (e.g., to be -- added) see swh_content_missing create or replace function swh_content_find( - sha1 sha1 default NULL, - sha1_git sha1_git default NULL, - sha256 sha256 default NULL + sha1 sha1 default NULL, + sha1_git sha1_git default NULL, + sha256 sha256 default NULL, + blake2s256 blake2s256 default NULL ) returns content language plpgsql @@ -231,12 +235,15 @@ if sha256 is not null then filters := filters || format('sha256 = %L', sha256); end if; + if blake2s256 is not null then + filters := filters || format('blake2s256 = %L', blake2s256); + end if; if cardinality(filters) = 0 then return null; else q = format('select * from content where %s', - array_to_string(filters, ' and ')); + array_to_string(filters, ' and ')); execute q into con; return con; end if; @@ -253,10 +260,10 @@ language plpgsql as $$ begin - insert into content (sha1, sha1_git, sha256, length, status) - select distinct sha1, sha1_git, sha256, length, status + insert into content (sha1, sha1_git, sha256, blake2s256, length, status) + select distinct sha1, sha1_git, sha256, blake2s256, length, status from tmp_content - where (sha1, sha1_git, sha256) in + where (sha1, sha1_git, sha256, blake2s256) in (select * from swh_content_missing()); -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid @@ -275,11 +282,11 @@ language plpgsql as $$ begin - insert into skipped_content (sha1, sha1_git, sha256, length, status, reason, origin) - select distinct sha1, sha1_git, sha256, length, status, reason, origin + insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, status, reason, origin) + select distinct sha1, sha1_git, sha256, blake2s256, length, status, reason, origin from tmp_skipped_content - where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')) in - (select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '') from swh_skipped_content_missing()); + where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, ''), coalesce(blake2s256)) in + (select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, ''), coalesce(blake2s256, '') from swh_skipped_content_missing()); -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid -- the extra swh_content_missing() query here. @@ -287,7 +294,6 @@ end $$; - -- Update content entries from temporary table. -- (columns are potential new columns added to the schema, this cannot be empty) -- diff --git a/sql/swh-indexes.sql b/sql/swh-indexes.sql --- a/sql/swh-indexes.sql +++ b/sql/swh-indexes.sql @@ -2,7 +2,8 @@ create unique index concurrently content_pkey on content(sha1); create unique index concurrently on content(sha1_git); -create unique index concurrently on content(sha256); +create index concurrently on content(sha256); +create index concurrently on content(blake2s256); create index concurrently on content(ctime); -- TODO use a BRIN index here (postgres >= 9.5) create index concurrently on content(object_id); @@ -77,7 +78,8 @@ create unique index concurrently on skipped_content(sha1); create unique index concurrently on skipped_content(sha1_git); -create unique index concurrently on skipped_content(sha256); +create index concurrently on skipped_content(sha256); +create index concurrently on skipped_content(blake2s256); create index concurrently on skipped_content(object_id); alter table skipped_content add constraint skipped_content_origin_fkey foreign key (origin) references origin(id) not valid; diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -14,7 +14,7 @@ ); insert into dbversion(version, release, description) - values(103, now(), 'Work In Progress'); + values(104, now(), 'Work In Progress'); -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); @@ -25,6 +25,9 @@ -- a SHA256 checksum create domain sha256 as bytea check (length(value) = 32); +-- a blake2 checksum +create domain blake2s256 as bytea check (length(value) = 32); + -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; @@ -37,14 +40,15 @@ -- content collisions not knowingly. create table content ( - sha1 sha1 not null, - sha1_git sha1_git not null, - sha256 sha256 not null, - length bigint not null, - ctime timestamptz not null default now(), - -- creation time, i.e. time of (first) injection into the storage - status content_status not null default 'visible', - object_id bigserial + sha1 sha1 not null, + sha1_git sha1_git not null, + sha256 sha256 not null, + blake2s256 blake2s256, + length bigint not null, + ctime timestamptz not null default now(), + -- creation time, i.e. time of (first) injection into the storage + status content_status not null default 'visible', + object_id bigserial ); @@ -165,15 +169,16 @@ -- out which origin contains that skipped content. create table skipped_content ( - sha1 sha1, - sha1_git sha1_git, - sha256 sha256, - length bigint not null, - ctime timestamptz not null default now(), - status content_status not null default 'absent', - reason text not null, - origin bigint, - object_id bigserial + sha1 sha1, + sha1_git sha1_git, + sha256 sha256, + blake2s256 blake2s256, + length bigint not null, + ctime timestamptz not null default now(), + status content_status not null default 'absent', + reason text not null, + origin bigint, + object_id bigserial ); -- Log of all origin fetches (i.e., origin crawling) that have been done in the diff --git a/sql/swh-triggers.sql b/sql/swh-triggers.sql --- a/sql/swh-triggers.sql +++ b/sql/swh-triggers.sql @@ -7,7 +7,8 @@ perform pg_notify('new_content', json_build_object( 'sha1', encode(new.sha1, 'hex'), 'sha1_git', encode(new.sha1_git, 'hex'), - 'sha256', encode(new.sha256, 'hex') + 'sha256', encode(new.sha256, 'hex'), + 'blake2s256', encode(new.blake2s256, 'hex') )::text); return null; end; @@ -45,7 +46,8 @@ perform pg_notify('new_skipped_content', json_build_object( 'sha1', encode(new.sha1, 'hex'), 'sha1_git', encode(new.sha1_git, 'hex'), - 'sha256', encode(new.sha256, 'hex') + 'sha256', encode(new.sha256, 'hex'), + 'blake2s256', encode(new.blake2s256, 'hex') )::text); return null; end; diff --git a/sql/upgrades/104.sql b/sql/upgrades/104.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/104.sql @@ -0,0 +1,169 @@ +-- SWH DB schema upgrade +-- from_version: 103 +-- to_version: 104 +-- description: Compute new hash blake2s256 for contents + +insert into dbversion(version, release, description) + values(104, now(), 'Work In Progress'); + +-- a blake2 checksum +create domain blake2s256 as bytea check (length(value) = 32); + +alter table content add column blake2s256 blake2s256; + +drop index content_sha256_idx; +create index concurrently on content(sha256); +create index concurrently on content(blake2s256); + +-- Asynchronous notification of new content insertions +create or replace function notify_new_content() + returns trigger + language plpgsql +as $$ + begin + perform pg_notify('new_content', json_build_object( + 'sha1', encode(new.sha1, 'hex'), + 'sha1_git', encode(new.sha1_git, 'hex'), + 'sha256', encode(new.sha256, 'hex'), + 'blake2s256', encode(new.blake2s256, 'hex') + )::text); + return null; + end; +$$; + +alter table skipped_content add column blake2s256 blake2s256; + +drop index skipped_content_sha256_idx; +create index concurrently on skipped_content(sha256); +create index concurrently on skipped_content(blake2s256); + +-- Asynchronous notification of new skipped content insertions +create or replace function notify_new_skipped_content() + returns trigger + language plpgsql +as $$ + begin + perform pg_notify('new_skipped_content', json_build_object( + 'sha1', encode(new.sha1, 'hex'), + 'sha1_git', encode(new.sha1_git, 'hex'), + 'sha256', encode(new.sha256, 'hex'), + 'blake2s256', encode(new.blake2s256, 'hex') + )::text); + return null; + end; +$$; + +create or replace function swh_content_add() + returns void + language plpgsql +as $$ +begin + insert into content (sha1, sha1_git, sha256, blake2s256, length, status) + select distinct sha1, sha1_git, sha256, blake2s256, length, status + from tmp_content + where (sha1, sha1_git, sha256, blake2s256) in + (select * from swh_content_missing()); + -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. + -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid + -- the extra swh_content_missing() query here. + return; +end +$$; + +create or replace function swh_skipped_content_add() + returns void + language plpgsql +as $$ +begin + insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, status, reason, origin) + select distinct sha1, sha1_git, sha256, blake2s256, length, status, reason, origin + from tmp_skipped_content + where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, ''), coalesce(blake2s256)) in + (select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, ''), coalesce(blake2s256, '') from swh_skipped_content_missing()); + -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. + -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid + -- the extra swh_content_missing() query here. + return; +end +$$; + +drop type content_signature cascade; + +create type content_signature as ( + sha1 sha1, + sha1_git sha1_git, + sha256 sha256, + blake2s256 blake2s256 +); + +create or replace function swh_content_missing() + returns setof content_signature + language plpgsql +as $$ +begin + return query ( + select sha1, sha1_git, sha256, blake2s256 from tmp_content as tmp + where not exists ( + select 1 from content as c + where c.sha1 = tmp.sha1 and + c.sha1_git = tmp.sha1_git and + c.sha256 = tmp.sha256 + ) + ); + return; +end +$$; + +create or replace function swh_skipped_content_missing() + returns setof content_signature + language plpgsql +as $$ +begin + return query + select sha1, sha1_git, sha256, blake2s256 from tmp_skipped_content t + where not exists + (select 1 from skipped_content s where + s.sha1 is not distinct from t.sha1 and + s.sha1_git is not distinct from t.sha1_git and + s.sha256 is not distinct from t.sha256); + return; +end +$$; + +create or replace function swh_content_find( + sha1 sha1 default NULL, + sha1_git sha1_git default NULL, + sha256 sha256 default NULL, + blake2s256 blake2s256 default NULL +) + returns content + language plpgsql +as $$ +declare + con content; + filters text[] := array[] :: text[]; -- AND-clauses used to filter content + q text; +begin + if sha1 is not null then + filters := filters || format('sha1 = %L', sha1); + end if; + if sha1_git is not null then + filters := filters || format('sha1_git = %L', sha1_git); + end if; + if sha256 is not null then + filters := filters || format('sha256 = %L', sha256); + end if; + if blake2s256 is not null then + filters := filters || format('blake2s256 = %L', blake2s256); + end if; + + if cardinality(filters) = 0 then + return null; + else + q = format('select * from content where %s', + array_to_string(filters, ' and ')); + execute q into con; + return con; + end if; +end +$$; diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -268,8 +268,12 @@ cur.execute("""select swh_content_update(ARRAY[%s] :: text[])""" % keys_to_update) - content_get_metadata_keys = ['sha1', 'sha1_git', 'sha256', 'length', - 'status'] + content_get_metadata_keys = [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status'] + + skipped_content_keys = [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', + 'length', 'reason', 'status', 'origin'] def content_get_metadata_from_temp(self, cur=None): cur = self._cursor(cur) @@ -282,7 +286,7 @@ def content_missing_from_temp(self, cur=None): cur = self._cursor(cur) - cur.execute("""SELECT sha1, sha1_git, sha256 + cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 FROM swh_content_missing()""") yield from cursor_to_bytes(cur) @@ -298,7 +302,7 @@ def skipped_content_missing_from_temp(self, cur=None): cur = self._cursor(cur) - cur.execute("""SELECT sha1, sha1_git, sha256 + cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 FROM swh_skipped_content_missing()""") yield from cursor_to_bytes(cur) @@ -319,24 +323,30 @@ yield from cursor_to_bytes(cur) - def content_find(self, sha1=None, sha1_git=None, sha256=None, cur=None): + content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', + 'ctime', 'status'] + + def content_find(self, sha1=None, sha1_git=None, sha256=None, + blake2s256=None, cur=None): """Find the content optionally on a combination of the following - checksums sha1, sha1_git or sha256. + checksums sha1, sha1_git, sha256 or blake2s256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content + blake2s256: blake2s256 content Returns: - The triplet (sha1, sha1_git, sha256) if found or None. + The tuple (sha1, sha1_git, sha256, blake2s256) if found or None. """ cur = self._cursor(cur) - cur.execute("""SELECT sha1, sha1_git, sha256, length, ctime, status - FROM swh_content_find(%s, %s, %s) - LIMIT 1""", (sha1, sha1_git, sha256)) + cur.execute("""SELECT %s + FROM swh_content_find(%%s, %%s, %%s, %%s) + LIMIT 1""" % ','.join(self.content_find_cols), + (sha1, sha1_git, sha256, blake2s256)) content = line_to_bytes(cur.fetchone()) if set(content) == {None}: diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -102,7 +102,7 @@ missing_content = set(self.content_missing(content_with_data)) missing_skipped = set( - sha1_git for sha1, sha1_git, sha256 + sha1_git for _, sha1_git, _, _ in self.skipped_content_missing(content_without_data)) with db.transaction() as cur: @@ -118,7 +118,7 @@ if cont['sha1'] in missing_content) db.copy_to(content_filtered, 'tmp_content', - ['sha1', 'sha1_git', 'sha256', 'length', 'status'], + db.content_get_metadata_keys, cur, item_cb=add_to_objstorage) # move metadata in place @@ -127,10 +127,10 @@ if missing_skipped: missing_filtered = (cont for cont in content_without_data if cont['sha1_git'] in missing_skipped) + db.mktemp('skipped_content', cur) db.copy_to(missing_filtered, 'tmp_skipped_content', - ['sha1', 'sha1_git', 'sha256', 'length', - 'reason', 'status', 'origin'], cur) + db.skipped_content_keys, cur) # move metadata in place db.skipped_content_add_from_temp(cur) @@ -232,7 +232,7 @@ """ db = self.db - keys = ['sha1', 'sha1_git', 'sha256'] + keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] if key_hash not in keys: raise ValueError("key_hash should be one of %s" % keys) @@ -278,7 +278,7 @@ Returns: an iterable of signatures missing from the storage """ - keys = ['sha1', 'sha1_git', 'sha256'] + keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] db = self.db @@ -310,15 +310,15 @@ if not set(content).intersection(ALGORITHMS): raise ValueError('content keys must contain at least one of: ' - 'sha1, sha1_git, sha256') + 'sha1, sha1_git, sha256, blake2s256') c = db.content_find(sha1=content.get('sha1'), sha1_git=content.get('sha1_git'), sha256=content.get('sha256'), + blake2s256=content.get('blake2s256'), cur=cur) if c: - keys = ['sha1', 'sha1_git', 'sha256', 'length', 'ctime', 'status'] - return dict(zip(keys, c)) + return dict(zip(db.content_find_cols, c)) return None @db_transaction_generator diff --git a/swh/storage/tests/test_db.py b/swh/storage/tests/test_db.py --- a/swh/storage/tests/test_db.py +++ b/swh/storage/tests/test_db.py @@ -43,9 +43,11 @@ 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), + 'blake2s256': hash_to_bytes('69217a3079908094e11121d042354a7c' + '1f55b6482ca1a51e1b250dfd1ed0eef9'), 'length': 3}], 'tmp_content', - ['sha1', 'sha1_git', 'sha256', 'length'], + ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length'], cur) self.db.content_add_from_temp(cur) self.cursor.execute('SELECT sha1 FROM content WHERE sha1 = %s', diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -65,6 +65,8 @@ 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), + 'blake2s256': hash_to_bytes('d5fe1939576527e42cfd76a9455a2' + '432fe7f56669564577dd93c4280e76d661d'), 'status': 'visible', } @@ -78,6 +80,8 @@ 'sha256': hash_to_bytes( '859f0b154fdb2d630f45e1ecae4a8629' '15435e663248bb8461d914696fc047cd'), + 'blake2s256': hash_to_bytes('849c20fad132b7c2d62c15de310adfe87be' + '94a379941bed295e8141c6219810d'), 'status': 'visible', } @@ -91,6 +95,8 @@ 'sha256': hash_to_bytes( '92fb72daf8c6818288a35137b72155f5' '07e5de8d892712ab96277aaed8cf8a36'), + 'blake2s256': hash_to_bytes('76d0346f44e5a27f6bafdd9c2befd304af' + 'f83780f93121d801ab6a1d4769db11'), 'status': 'visible', } @@ -104,6 +110,8 @@ 'sha256': hash_to_bytes( '6bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), + 'blake2s256': hash_to_bytes('306856b8fd879edb7b6f1aeaaf8db9bbecc9' + '93cd7f776c333ac3a782fa5c6eba'), 'status': 'absent', } @@ -627,21 +635,22 @@ self.storage.content_add([cont]) self.storage.content_add([cont2]) - self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status,' - 'reason FROM skipped_content ORDER BY sha1_git') + self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' + 'length, status, reason ' + 'FROM skipped_content ORDER BY sha1_git') datum = self.cursor.fetchone() self.assertEqual( (datum[0], datum[1].tobytes(), datum[2], - datum[3], datum[4], datum[5]), - (None, cont['sha1_git'], None, + datum[3], datum[4], datum[5], datum[6]), + (None, cont['sha1_git'], None, None, cont['length'], 'absent', 'Content too long')) datum2 = self.cursor.fetchone() self.assertEqual( (datum2[0], datum2[1].tobytes(), datum2[2], - datum2[3], datum2[4], datum2[5]), - (None, cont2['sha1_git'], None, + datum2[3], datum2[4], datum2[5], datum2[6]), + (None, cont2['sha1_git'], None, None, cont2['length'], 'absent', 'Content too long')) @istest @@ -1898,6 +1907,7 @@ 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], + 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) @@ -1911,6 +1921,7 @@ 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], + 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) @@ -1924,21 +1935,25 @@ 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], + 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 4. with something to find - actually_present = self.storage.content_find( - {'sha1': cont['sha1'], - 'sha1_git': cont['sha1_git'], - 'sha256': cont['sha256']}) + actually_present = self.storage.content_find({ + 'sha1': cont['sha1'], + 'sha1_git': cont['sha1_git'], + 'sha256': cont['sha256'], + 'blake2s256': cont['blake2s256'], + }) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], + 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' })