diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql index 3290ffe..0274bd1 100644 --- a/sql/archiver/swh-archiver-func.sql +++ b/sql/archiver/swh-archiver-func.sql @@ -1,92 +1,108 @@ create or replace function swh_mktemp_content_archive() returns void language sql as $$ create temporary table tmp_content_archive ( like content_archive including defaults ) on commit drop; alter table tmp_content_archive drop column copies; alter table tmp_content_archive drop column num_present; $$; COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive'; create or replace function swh_content_archive_missing(backend_name text) returns setof sha1 language plpgsql as $$ begin return query select content_id from tmp_content_archive tmp where exists ( select 1 from content_archive c where tmp.content_id = c.content_id and (not c.copies ? backend_name or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb)) ); end $$; COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend'; create or replace function swh_content_archive_unknown() returns setof sha1 language plpgsql as $$ begin return query select content_id from tmp_content_archive tmp where not exists ( select 1 from content_archive c where tmp.content_id = c.content_id ); end $$; COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1s'; CREATE OR REPLACE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$ with sample as ( select content_id, copies from content_archive where content_id > from_id and content_id <= to_id ), data as ( select substring(content_id from 19) as bucket, jbe.key as archive from sample join lateral jsonb_each(copies) jbe on true where jbe.value->>'status' = 'present' ), bucketed as ( select bucket, archive, count(*) as count from data group by bucket, archive ) update content_archive_counts cac set count = cac.count + bucketed.count from bucketed where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket; $$; comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts'; CREATE OR REPLACE FUNCTION init_content_archive_counts() returns void language sql as $$ insert into content_archive_counts ( select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count from archive join lateral generate_series(0, 65535) bucket on true ) on conflict (archive, bucket) do nothing; $$; comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives'; create type content_archive_count as ( archive text, count bigint ); create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$ select archive, sum(count)::bigint from content_archive_counts group by archive order by archive; $$; comment on function get_content_archive_counts() is 'Get count for each archive'; + +-- Add new content_archive from temporary table, skipping duplicates. +create or replace function swh_content_archive_add() + returns void + language plpgsql +as $$ +begin + insert into content_archive (content_id, copies, num_present) + select distinct content_id, copies, num_present + from tmp_content_archive + on conflict(content_id) do nothing; + return; +end +$$; + +comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql index aac607e..1514d3e 100644 --- a/sql/archiver/swh-archiver-schema.sql +++ b/sql/archiver/swh-archiver-schema.sql @@ -1,121 +1,121 @@ -- In order to archive the content of the object storage, add -- some tables to keep trace of what have already been archived. create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Schema update tracking'; INSERT INTO dbversion(version, release, description) -VALUES(8, now(), 'Work In Progress'); +VALUES(9, now(), 'Work In Progress'); CREATE TABLE archive ( id text PRIMARY KEY ); comment on table archive is 'Possible archives'; comment on column archive.id is 'Short identifier for the archive'; CREATE TYPE archive_status AS ENUM ( 'missing', 'ongoing', 'present', 'corrupted' ); comment on type archive_status is 'Status of a given archive'; -- a SHA1 checksum (not necessarily originating from Git) CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20); -- a bucket for which we count items CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2); CREATE TABLE content_archive ( content_id sha1 primary key, copies jsonb, num_present int default null ); create index on content_archive(num_present); comment on table content_archive is 'Referencing the status and whereabouts of a content'; comment on column content_archive.content_id is 'content identifier'; comment on column content_archive.copies is 'map archive_id -> { "status": archive_status, "mtime": epoch timestamp }'; comment on column content_archive.num_present is 'Number of copies marked as present (cache updated via trigger)'; CREATE TABLE content_archive_counts ( archive text not null references archive(id), bucket bucket not null, count bigint, primary key (archive, bucket) ); comment on table content_archive_counts is 'Bucketed count of archive contents'; comment on column content_archive_counts.archive is 'the archive for which we''re counting'; comment on column content_archive_counts.bucket is 'the bucket of items we''re counting'; comment on column content_archive_counts.count is 'the number of items counted in the given bucket'; -- Keep the num_copies cache updated CREATE FUNCTION update_num_present() RETURNS TRIGGER AS $$ BEGIN NEW.num_present := (select count(*) from jsonb_each(NEW.copies) where value->>'status' = 'present'); RETURN new; END; $$ LANGUAGE PLPGSQL; CREATE TRIGGER update_num_present BEFORE INSERT OR UPDATE OF copies ON content_archive FOR EACH ROW EXECUTE PROCEDURE update_num_present(); -- keep the content_archive_counts updated CREATE OR REPLACE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ DECLARE content_id sha1; content_bucket bucket; copies record; old_row content_archive; new_row content_archive; BEGIN -- default values for old or new row depending on trigger type if tg_op = 'INSERT' then old_row := (null::sha1, '{}'::jsonb, 0); else old_row := old; end if; if tg_op = 'DELETE' then new_row := (null::sha1, '{}'::jsonb, 0); else new_row := new; end if; -- get the content bucket content_id := coalesce(old_row.content_id, new_row.content_id); content_bucket := substring(content_id from 19)::bucket; -- compare copies present in old and new row for each archive type FOR copies IN select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key LOOP -- the count didn't change CONTINUE WHEN copies.old_status is not distinct from copies.new_status OR (copies.old_status != 'present' AND copies.new_status != 'present'); update content_archive_counts cac set count = count + (case when copies.old_status = 'present' then -1 else 1 end) where archive = copies.archive and bucket = content_bucket; END LOOP; return null; END; $$; create trigger update_content_archive_counts AFTER INSERT OR UPDATE OR DELETE ON content_archive FOR EACH ROW EXECUTE PROCEDURE update_content_archive_counts(); diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql new file mode 100644 index 0000000..96932a4 --- /dev/null +++ b/sql/archiver/upgrades/009.sql @@ -0,0 +1,22 @@ +-- SWH DB schema upgrade +-- from_version: 8 +-- to_version: 9 +-- description: Add helper function to create new entries in content_archive table + +INSERT INTO dbversion(version, release, description) +VALUES(9, now(), 'Work In Progress'); + +create or replace function swh_content_archive_add() + returns void + language plpgsql +as $$ +begin + insert into content_archive (content_id, copies, num_present) + select distinct content_id, copies, num_present + from tmp_content_archive + on conflict(content_id) do nothing; + return; +end +$$; + +comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py index 2b34829..6b20e03 100644 --- a/swh/storage/archiver/db.py +++ b/swh/storage/archiver/db.py @@ -1,275 +1,284 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import time from swh.core import hashutil from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") yield from cursor_to_bytes(cur) def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id = %s ORDER BY content_id """ cur = self._cursor(cur) cur.execute(query, (content_id,)) row = cur.fetchone() if not row: return None content_id, present, ongoing, mtimes = row return (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s AND num_present < %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, retention_policy, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. + """ + pass + + @stored_procedure('swh_add_content_archive') + def add_content_archive_from_temp(self, cur=None): + """Add new content archive entries from temporary table. + Use from archiver.storage module: self.db.mktemp_content_archive() # copy data over to the temp table self.db.copy_to([{'colname': id0}, {'colname': id1}], 'tmp_cache_content', ['colname'], cur) + # insert into the main table + self.db.add_content_archive_from_temp(cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) yield from cursor_to_bytes(cur) def content_archive_get_unknown(self, cur=None): """Retrieve unknown sha1 from archiver db. """ cur = self._cursor(cur) cur.execute('select * from swh_content_archive_unknown()') yield from cursor_to_bytes(cur) def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ if isinstance(content_id, bytes): content_id = '\\x%s' % hashutil.hash_to_hex(content_id) if new_status is not None: query = """UPDATE content_archive SET copies=jsonb_set( copies, '{%s}', '{"status":"%s", "mtime":%d}' ) WHERE content_id='%s' """ % (archive_id, new_status, int(time.time()), content_id) else: query = """ UPDATE content_archive SET copies=jsonb_set(copies, '{%s,mtime}', '%d') WHERE content_id='%s' """ % (archive_id, int(time.time())) cur = self._cursor(cur) cur.execute(query) def content_archive_add( self, content_id, sources_present, sources_missing, cur=None): """Add content archive entry for the content content_id. The status is: - present for all sources in sources_present. - missing for all sources in sources_missing. """ if isinstance(content_id, bytes): content_id = '\\x%s' % hashutil.hash_to_hex(content_id) copies = {} num_present = 0 for source in sources_present: copies[source] = { "status": "present", "mtime": int(time.time()), } num_present += 1 for source in sources_missing: copies[source] = { "status": "missing", } query = """INSERT INTO content_archive(content_id, copies, num_present) VALUES('%s', '%s', %s) """ % (content_id, json.dumps(copies), num_present) cur = self._cursor(cur) cur.execute(query) diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py index f60c77c..a63301a 100644 --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -1,173 +1,199 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import json import psycopg2 +import time from .db import ArchiverDb from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError class ArchiverStorage(): """SWH Archiver storage proxy, encompassing DB """ def __init__(self, db_conn): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = ArchiverDb(db_conn) else: self.db = ArchiverDb.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) @db_transaction_generator def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from self.db.archive_ls(cur) @db_transaction def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return self.db.content_archive_get(content_id, cur) @db_transaction_generator def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_copies(last_content, limit, cur) @db_transaction_generator def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_unarchived_copies( retention_policy, last_content, limit, cur) @db_transaction_generator def content_archive_get_missing(self, content_ids, backend_name, cur=None): """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: missing sha1s from backend_name """ db = self.db db.mktemp_content_archive() db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_missing(backend_name, cur): yield content_id[0] @db_transaction_generator def content_archive_get_unknown(self, content_ids, cur=None): """Retrieve unknown sha1s from content_archive. Args: content_ids ([sha1s]): list of sha1s to test Yields: Unknown sha1s from content_archive """ db = self.db db.mktemp_content_archive() db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_unknown(cur): yield content_id[0] @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ self.db.content_archive_update(content_id, archive_id, new_status, cur) @db_transaction def content_archive_add( self, content_ids, sources_present, sources_missing, cur=None): """Insert a new entry in db about content_id. Args: content_ids ([bytes|str]): content identifiers sources_present ([str]): List of source names where contents are present sources_missing ([str]): List of sources names where contents are missing """ - for content_id in content_ids: - self.db.content_archive_add( - content_id, sources_present, sources_missing) + copies = {} + num_present = 0 + for source in sources_present: + copies[source] = { + "status": "present", + "mtime": int(time.time()), + } + num_present += 1 + + for source in sources_missing: + copies[source] = { + "status": "missing", + } + + copies = json.dumps(copies) + num_present = len(sources_present) + + db = self.db + db.mktemp_content_archive(cur) + db.copy_to(({'content_id': id, + 'copies': copies, + 'num_present': num_present} + for id in content_ids), + 'tmp_content_archive', + ['content_id', 'copies', 'num_present'], + cur) + db.content_archive_add_from_temp(cur)