diff --git a/PKG-INFO b/PKG-INFO index 359299e12..6a2696938 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.87 +Version: 0.0.88 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index 18b99f504..d3dfcc1f6 100644 --- a/debian/control +++ b/debian/control @@ -1,57 +1,48 @@ Source: swh-storage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-click, python3-dateutil, python3-flask, python3-nose, python3-psycopg2, python3-requests, python3-setuptools, python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), python3-swh.scheduler (>= 0.0.14~), python3-aiohttp, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ Package: python3-swh.storage Architecture: all Depends: python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage utilities Package: python3-swh.storage.listener Architecture: all Depends: python3-swh.journal (>= 0.0.2~), python3-kafka (>= 1.3.1~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage listener -Package: python3-swh.storage.archiver -Architecture: all -Depends: python3-swh.scheduler (>= 0.0.14~), - python3-swh.journal, - python3-swh.storage (= ${binary:Version}), - ${misc:Depends}, - ${python3:Depends} -Description: Software Heritage storage Archiver - Package: python3-swh.storage.provenance Architecture: all Depends: python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Provenance diff --git a/debian/rules b/debian/rules index 5dbaab1b4..cf9189bd7 100755 --- a/debian/rules +++ b/debian/rules @@ -1,26 +1,23 @@ #!/usr/bin/make -f export PYBUILD_NAME=swh.storage %: dh $@ --with python3 --buildsystem=pybuild override_dh_install: dh_install for pyvers in $(shell py3versions -vr); do \ mkdir -p $(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \ mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/listener.py \ $(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \ - mkdir -p $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver ; \ - mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/* \ - $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/ ; \ mkdir -p $(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance ; \ mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/* \ $(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/ ; \ done override_dh_auto_test: PYBUILD_SYSTEM=custom \ PYBUILD_TEST_ARGS="cd {build_dir}; python{version} -m nose swh -sva '!db'" \ dh_auto_test diff --git a/requirements.txt b/requirements.txt index 9e003ff34..1c4adeff9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,7 @@ click flask psycopg2 python-dateutil -python-fastimport vcversioner kafka aiohttp diff --git a/setup.py b/setup.py index 7dc27a7b6..4ffd85e19 100755 --- a/setup.py +++ b/setup.py @@ -1,38 +1,37 @@ #!/usr/bin/env python3 from setuptools import setup def parse_requirements(): requirements = [] for reqf in ('requirements.txt', 'requirements-swh.txt'): with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.storage', description='Software Heritage storage manager', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSTO/', packages=[ 'swh.storage', - 'swh.storage.archiver', 'swh.storage.api', 'swh.storage.provenance', 'swh.storage.tests', ], scripts=[ 'bin/swh-storage-add-dir', ], install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/sql/archiver/Makefile b/sql/archiver/Makefile deleted file mode 100644 index c132dbcce..000000000 --- a/sql/archiver/Makefile +++ /dev/null @@ -1,42 +0,0 @@ -# Depends: postgresql-client, postgresql-autodoc - -DBNAME = softwareheritage-archiver-dev -DOCDIR = autodoc - -SQL_INIT = ../swh-init.sql -SQL_SCHEMA = swh-archiver-schema.sql -SQL_FUNC = swh-archiver-func.sql -SQL_DATA = swh-archiver-data.sql -SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA) - -PSQL_BIN = psql -PSQL_FLAGS = --single-transaction --echo-all -X -PSQL = $(PSQL_BIN) $(PSQL_FLAGS) - - -all: - -createdb: createdb-stamp -createdb-stamp: $(SQL_INIT) - createdb $(DBNAME) - touch $@ - -filldb: filldb-stamp -filldb-stamp: createdb-stamp - cat $(SQLS) | $(PSQL) $(DBNAME) - touch $@ - -dropdb: - -dropdb $(DBNAME) - -dumpdb: swh-archiver.dump -swh.dump: filldb-stamp - pg_dump -Fc $(DBNAME) > $@ - -clean: - rm -rf *-stamp $(DOCDIR)/ - -distclean: clean dropdb - rm -f swh.dump - -.PHONY: all initdb createdb dropdb doc clean diff --git a/sql/archiver/swh-archiver-data.sql b/sql/archiver/swh-archiver-data.sql deleted file mode 100644 index e4a70a25e..000000000 --- a/sql/archiver/swh-archiver-data.sql +++ /dev/null @@ -1,3 +0,0 @@ -INSERT INTO archive(name) VALUES('uffizi'); -INSERT INTO archive(name) VALUES('banco'); -INSERT INTO archive(name) VALUES('azure'); diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql deleted file mode 100644 index 750bfae2c..000000000 --- a/sql/archiver/swh-archiver-func.sql +++ /dev/null @@ -1,40 +0,0 @@ -create or replace function swh_mktemp_content() - returns void - language plpgsql -as $$ - begin - create temporary table tmp_content ( - sha1 sha1 not null - ) on commit drop; - return; - end -$$; - -create or replace function swh_content_copies_from_temp(archive_names text[]) - returns void - language plpgsql -as $$ - begin - with existing_content_ids as ( - select id - from content - inner join tmp_content on content.sha1 = tmp.sha1 - ), created_content_ids as ( - insert into content (sha1) - select sha1 from tmp_content - on conflict do nothing - returning id - ), content_ids as ( - select * from existing_content_ids - union all - select * from created_content_ids - ), archive_ids as ( - select id from archive - where name = any(archive_names) - ) insert into content_copies (content_id, archive_id, mtime, status) - select content_ids.id, archive_ids.id, now(), 'present' - from content_ids cross join archive_ids - on conflict (content_id, archive_id) do update - set mtime = excluded.mtime, status = excluded.status; - end -$$; diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql deleted file mode 100644 index e5849585f..000000000 --- a/sql/archiver/swh-archiver-schema.sql +++ /dev/null @@ -1,63 +0,0 @@ --- In order to archive the content of the object storage, add --- some tables to keep trace of what have already been archived. - -create table dbversion -( - version int primary key, - release timestamptz, - description text -); - -comment on table dbversion is 'Schema update tracking'; - -INSERT INTO dbversion(version, release, description) -VALUES(10, now(), 'Work In Progress'); - -CREATE TABLE archive ( - id bigserial PRIMARY KEY, - name text not null -); - -create unique index on archive(name); - -comment on table archive is 'The archives in which contents are stored'; -comment on column archive.id is 'Short identifier for archives'; -comment on column archive.name is 'Name of the archive'; - -CREATE TYPE archive_status AS ENUM ( - 'missing', - 'ongoing', - 'present', - 'corrupted' -); - -comment on type archive_status is 'Status of a given copy of a content'; - --- a SHA1 checksum (not necessarily originating from Git) -CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20); - --- a bucket for which we count items -CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2); - -create table content ( - id bigserial primary key, - sha1 sha1 not null -); - -comment on table content is 'All the contents being archived by Software Heritage'; -comment on column content.id is 'Short id for the content being archived'; -comment on column content.sha1 is 'SHA1 hash of the content being archived'; - -create unique index on content(sha1); - -create table content_copies ( - content_id bigint not null, -- references content(id) - archive_id bigint not null, -- references archive(id) - mtime timestamptz, - status archive_status not null, - primary key (content_id, archive_id) -); - -comment on table content_copies is 'Tracking of all content copies in the archives'; -comment on column content_copies.mtime is 'Last update time of the copy'; -comment on column content_copies.status is 'Status of the copy'; diff --git a/sql/archiver/upgrades/002.sql b/sql/archiver/upgrades/002.sql deleted file mode 100644 index d83db0281..000000000 --- a/sql/archiver/upgrades/002.sql +++ /dev/null @@ -1,9 +0,0 @@ --- SWH DB schema upgrade --- from_version: 1 --- to_version: 2 --- description: Add a 'corrupted' status into the archive_content status - -INSERT INTO dbversion(version, release, description) -VALUES(2, now(), 'Work In Progress'); - -ALTER TYPE archive_status ADD VALUE 'corrupted'; diff --git a/sql/archiver/upgrades/003.sql b/sql/archiver/upgrades/003.sql deleted file mode 100644 index ba43f5268..000000000 --- a/sql/archiver/upgrades/003.sql +++ /dev/null @@ -1,25 +0,0 @@ --- SWH DB schema upgrade --- from_version: 2 --- to_version: 3 --- description: Add a 'num_present' cache column into the archive_content status - -INSERT INTO dbversion(version, release, description) -VALUES(3, now(), 'Work In Progress'); - -alter table content_archive add column num_present int default null; -comment on column content_archive.num_present is 'Number of copies marked as present (cache updated via trigger)'; - -create index concurrently on content_archive(num_present); - --- Keep the num_copies cache updated -CREATE FUNCTION update_num_present() RETURNS TRIGGER AS $$ - BEGIN - NEW.num_present := (select count(*) from jsonb_each(NEW.copies) where value->>'status' = 'present'); - RETURN new; - END; -$$ LANGUAGE PLPGSQL; - -CREATE TRIGGER update_num_present - BEFORE INSERT OR UPDATE OF copies ON content_archive - FOR EACH ROW - EXECUTE PROCEDURE update_num_present(); diff --git a/sql/archiver/upgrades/004.sql b/sql/archiver/upgrades/004.sql deleted file mode 100644 index bfb5ad314..000000000 --- a/sql/archiver/upgrades/004.sql +++ /dev/null @@ -1,44 +0,0 @@ --- SWH DB schema upgrade --- from_version: 3 --- to_version: 4 --- description: Add azure instance - -INSERT INTO dbversion(version, release, description) -VALUES(4, now(), 'Work In Progress'); - -ALTER TABLE archive DROP COLUMN url; -ALTER TABLE archive ALTER COLUMN id SET DATA TYPE TEXT; - -INSERT INTO archive(id) VALUES ('azure'); - -create or replace function swh_mktemp_content_archive() - returns void - language sql -as $$ - create temporary table tmp_content_archive ( - like content_archive including defaults - ) on commit drop; - alter table tmp_content_archive drop column copies; - alter table tmp_content_archive drop column num_present; -$$; - -COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive'; - -create or replace function swh_content_archive_missing(backend_name text) - returns setof sha1 - language plpgsql -as $$ -begin - return query - select content_id - from tmp_content_archive tmp where exists ( - select 1 - from content_archive c - where tmp.content_id = c.content_id - and (not c.copies ? backend_name - or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb)) - ); -end -$$; - -COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend'; diff --git a/sql/archiver/upgrades/005.sql b/sql/archiver/upgrades/005.sql deleted file mode 100644 index bc50631c1..000000000 --- a/sql/archiver/upgrades/005.sql +++ /dev/null @@ -1,24 +0,0 @@ --- SWH DB schema upgrade --- from_version: 4 --- to_version: 5 --- description: List unknown sha1s from content_archive - -INSERT INTO dbversion(version, release, description) -VALUES(5, now(), 'Work In Progress'); - -create or replace function swh_content_archive_unknown() - returns setof sha1 - language plpgsql -as $$ -begin - return query - select content_id - from tmp_content_archive tmp where not exists ( - select 1 - from content_archive c - where tmp.content_id = c.content_id - ); -end -$$; - -COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1'; diff --git a/sql/archiver/upgrades/006.sql b/sql/archiver/upgrades/006.sql deleted file mode 100644 index d9d1b24ce..000000000 --- a/sql/archiver/upgrades/006.sql +++ /dev/null @@ -1,100 +0,0 @@ --- SWH DB schema upgrade --- from_version: 5 --- to_version: 6 --- description: Create a bucketed count of contents in the archive. - -INSERT INTO dbversion(version, release, description) -VALUES(6, now(), 'Work In Progress'); - --- a bucket for which we count items -CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2); - -CREATE TABLE content_archive_counts ( - archive text not null references archive(id), - bucket bucket not null, - count bigint, - primary key (archive, bucket) -); - -comment on table content_archive_counts is 'Bucketed count of archive contents'; -comment on column content_archive_counts.archive is 'the archive for which we''re counting'; -comment on column content_archive_counts.bucket is 'the bucket of items we''re counting'; -comment on column content_archive_counts.count is 'the number of items counted in the given bucket'; - - -CREATE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$ - with sample as ( - select content_id, copies from content_archive - where content_id > from_id and content_id <= to_id - ), data as ( - select substring(content_id from 19) as bucket, jbe.key as archive - from sample - join lateral jsonb_each(copies) jbe on true - where jbe.value->>'status' = 'present' - ), bucketed as ( - select bucket, archive, count(*) as count - from data - group by bucket, archive - ) update content_archive_counts cac set - count = cac.count + bucketed.count - from bucketed - where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket; -$$; - -comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts'; - -CREATE FUNCTION init_content_archive_counts() returns void language sql as $$ - insert into content_archive_counts ( - select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count - from archive join lateral generate_series(0, 65535) bucket on true - ) on conflict (archive, bucket) do nothing; -$$; - -comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives'; - --- keep the content_archive_counts updated -CREATE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ - DECLARE - content_id sha1; - content_bucket bucket; - copies record; - old_row content_archive; - new_row content_archive; - BEGIN - -- default values for old or new row depending on trigger type - if tg_op = 'INSERT' then - old_row := (null::sha1, '{}'::jsonb, 0); - else - old_row := old; - end if; - if tg_op = 'DELETE' then - new_row := (null::sha1, '{}'::jsonb, 0); - else - new_row := new; - end if; - - -- get the content bucket - content_id := coalesce(old_row.content_id, new_row.content_id); - content_bucket := substring(content_id from 19)::bucket; - - -- compare copies present in old and new row for each archive type - FOR copies IN - select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status - from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key - LOOP - -- the count didn't change - CONTINUE WHEN copies.old_status is distinct from copies.new_status OR - (copies.old_status != 'present' AND copies.new_status != 'present'); - - update content_archive_counts cac - set count = count + (case when copies.old_status = 'present' then -1 else 1 end) - where archive = copies.archive and bucket = content_bucket; - END LOOP; - return null; - END; -$$; - -create trigger update_content_archive_counts - AFTER INSERT OR UPDATE OR DELETE ON content_archive - FOR EACH ROW - EXECUTE PROCEDURE update_content_archive_counts(); diff --git a/sql/archiver/upgrades/007.sql b/sql/archiver/upgrades/007.sql deleted file mode 100644 index 34049426e..000000000 --- a/sql/archiver/upgrades/007.sql +++ /dev/null @@ -1,21 +0,0 @@ --- SWH DB schema upgrade --- from_version: 6 --- to_version: 7 --- description: Add a function to compute archive counts - -INSERT INTO dbversion(version, release, description) -VALUES(7, now(), 'Work In Progress'); - -create type content_archive_count as ( - archive text, - count bigint -); - -create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$ - select archive, sum(count)::bigint - from content_archive_counts - group by archive - order by archive; -$$; - -comment on function get_content_archive_counts() is 'Get count for each archive'; diff --git a/sql/archiver/upgrades/008.sql b/sql/archiver/upgrades/008.sql deleted file mode 100644 index 6527aca69..000000000 --- a/sql/archiver/upgrades/008.sql +++ /dev/null @@ -1,49 +0,0 @@ --- SWH DB schema upgrade --- from_version: 7 --- to_version: 8 --- description: Fix silly bug in update_content_archive_counts - -INSERT INTO dbversion(version, release, description) -VALUES(8, now(), 'Work In Progress'); - --- keep the content_archive_counts updated -CREATE OR REPLACE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ - DECLARE - content_id sha1; - content_bucket bucket; - copies record; - old_row content_archive; - new_row content_archive; - BEGIN - -- default values for old or new row depending on trigger type - if tg_op = 'INSERT' then - old_row := (null::sha1, '{}'::jsonb, 0); - else - old_row := old; - end if; - if tg_op = 'DELETE' then - new_row := (null::sha1, '{}'::jsonb, 0); - else - new_row := new; - end if; - - -- get the content bucket - content_id := coalesce(old_row.content_id, new_row.content_id); - content_bucket := substring(content_id from 19)::bucket; - - -- compare copies present in old and new row for each archive type - FOR copies IN - select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status - from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key - LOOP - -- the count didn't change - CONTINUE WHEN copies.old_status is not distinct from copies.new_status OR - (copies.old_status != 'present' AND copies.new_status != 'present'); - - update content_archive_counts cac - set count = count + (case when copies.old_status = 'present' then -1 else 1 end) - where archive = copies.archive and bucket = content_bucket; - END LOOP; - return null; - END; -$$; diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql deleted file mode 100644 index 5a3133bab..000000000 --- a/sql/archiver/upgrades/009.sql +++ /dev/null @@ -1,42 +0,0 @@ --- SWH Archiver DB schema upgrade --- from_version: 8 --- to_version: 9 --- description: Add helper functions to create temporary table and insert new entries in content_archive table - -insert into dbversion(version, release, description) -values(9, now(), 'Work In Progress'); - --- create a temporary table called tmp_TBLNAME, mimicking existing --- table TBLNAME -create or replace function swh_mktemp(tblname regclass) - returns void - language plpgsql -as $$ -begin - execute format(' - create temporary table tmp_%1$I - (like %1$I including defaults) - on commit drop; - ', tblname); - return; -end -$$; - -comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; - --- Helper function to insert new entries in content_archive from a --- temporary table skipping duplicates. -create or replace function swh_content_archive_add() - returns void - language plpgsql -as $$ -begin - insert into content_archive (content_id, copies, num_present) - select distinct content_id, copies, num_present - from tmp_content_archive - on conflict(content_id) do nothing; - return; -end -$$; - -comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 359299e12..6a2696938 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.87 +Version: 0.0.88 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index b6d644006..6a91c3ce8 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,194 +1,173 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile Makefile.local README.db_testing README.dev requirements-swh.txt requirements.txt setup.py version.txt bin/swh-storage-add-dir debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/archiver-blueprint.md sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/swh-data.sql sql/swh-enums.sql sql/swh-func.sql sql/swh-indexes.sql sql/swh-init.sql sql/swh-schema.sql sql/swh-triggers.sql -sql/archiver/Makefile -sql/archiver/swh-archiver-data.sql -sql/archiver/swh-archiver-func.sql -sql/archiver/swh-archiver-schema.sql -sql/archiver/upgrades/002.sql -sql/archiver/upgrades/003.sql -sql/archiver/upgrades/004.sql -sql/archiver/upgrades/005.sql -sql/archiver/upgrades/006.sql -sql/archiver/upgrades/007.sql -sql/archiver/upgrades/008.sql -sql/archiver/upgrades/009.sql sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/indexer_configuration.tool_configuration.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/origin_visit.metadata.json sql/doc/json/revision.metadata.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/indexer_configuration.tool_configuration.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/origin_visit.metadata.json sql/json/revision.metadata.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql sql/upgrades/069.sql sql/upgrades/070.sql sql/upgrades/071.sql sql/upgrades/072.sql sql/upgrades/073.sql sql/upgrades/074.sql sql/upgrades/075.sql sql/upgrades/076.sql sql/upgrades/077.sql sql/upgrades/078.sql sql/upgrades/079.sql sql/upgrades/080.sql sql/upgrades/081.sql sql/upgrades/082.sql sql/upgrades/083.sql sql/upgrades/084.sql sql/upgrades/085.sql sql/upgrades/086.sql sql/upgrades/087.sql sql/upgrades/088.sql sql/upgrades/089.sql sql/upgrades/090.sql sql/upgrades/091.sql sql/upgrades/092.sql sql/upgrades/093.sql sql/upgrades/094.sql sql/upgrades/095.sql sql/upgrades/096.sql sql/upgrades/097.sql sql/upgrades/098.sql sql/upgrades/099.sql sql/upgrades/100.sql sql/upgrades/101.sql sql/upgrades/102.sql sql/upgrades/103.sql sql/upgrades/104.sql sql/upgrades/105.sql sql/upgrades/106.sql sql/upgrades/107.sql swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py swh/storage/common.py swh/storage/converters.py swh/storage/db.py swh/storage/exc.py swh/storage/listener.py swh/storage/storage.py swh/storage/api/__init__.py swh/storage/api/client.py swh/storage/api/server.py -swh/storage/archiver/__init__.py -swh/storage/archiver/copier.py -swh/storage/archiver/db.py -swh/storage/archiver/director.py -swh/storage/archiver/storage.py -swh/storage/archiver/tasks.py -swh/storage/archiver/updater.py -swh/storage/archiver/worker.py swh/storage/provenance/tasks.py swh/storage/tests/server_testing.py swh/storage/tests/test_api_client.py -swh/storage/tests/test_archiver.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py swh/storage/tests/test_storage.py utils/dump_revisions.py utils/fix_revisions_from_dump.py \ No newline at end of file diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index e0e8119c7..e6dc7cf63 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,13 +1,12 @@ aiohttp click flask kafka psycopg2 python-dateutil -python-fastimport swh.core>=0.0.28 swh.journal>=0.0.2 swh.model>=0.0.15 swh.objstorage>=0.0.17 swh.scheduler>=0.0.14 vcversioner diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py deleted file mode 100644 index 2ff1cce13..000000000 --- a/swh/storage/archiver/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .director import ArchiverWithRetentionPolicyDirector # NOQA -from .director import ArchiverStdinToBackendDirector # NOQA -from .worker import ArchiverWithRetentionPolicyWorker # NOQA -from .worker import ArchiverToBackendWorker # NOQA -from .copier import ArchiverCopier # NOQA diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py deleted file mode 100644 index e17a2f1bd..000000000 --- a/swh/storage/archiver/copier.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -import logging - -from swh.objstorage.exc import ObjNotFoundError -from swh.model import hashutil - -logger = logging.getLogger('archiver.worker.copier') - - -class ArchiverCopier(): - """ This archiver copy some files into a remote objstorage - in order to get a backup. - """ - def __init__(self, source, destination, content_ids): - """ Create a Copier for the archiver - - Args: - source (ObjStorage): source storage to get the contents. - destination (ObjStorage): Storage where the contents will - be copied. - content_ids: list of content's id to archive. - """ - self.source = source - self.destination = destination - self.content_ids = content_ids - - def run(self): - """ Do the copy on the backup storage. - - Run the archiver copier in order to copy the required content - into the current destination. - The content which corresponds to the sha1 in self.content_ids - will be fetched from the master_storage and then copied into - the backup object storage. - - Returns: - A boolean that indicates if the whole content have been copied. - """ - try: - for content_id in self.content_ids: - try: - content = self.source.get(content_id) - except ObjNotFoundError: - logging.error('content %s not found' % - hashutil.hash_to_hex(content_id)) - continue - self.destination.add(content, content_id) - except Exception as e: - logging.error('Problem during copy: %s' % e) - return False - return True diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py deleted file mode 100644 index d9e5e4399..000000000 --- a/swh/storage/archiver/db.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime - -from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure - - -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - -class ArchiverDb(BaseDb): - """Proxy to the SWH's archiver DB - - """ - - def archive_ls(self, cur=None): - """ Get all the archives registered on the server. - - Yields: - a tuple (server_id, server_url) for each archive server. - """ - cur = self._cursor(cur) - cur.execute("SELECT * FROM archive") - yield from cursor_to_bytes(cur) - - def content_archive_get(self, content_id, cur=None): - """ Get the archival status of a content in a specific server. - - Retrieve from the database the archival status of the given content - in the given archive server. - - Args: - content_id: the sha1 of the content. - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - """ - query = """select archive.name, status, mtime - from content_copies - left join archive on content_copies.archive_id = archive.id - where content_copies.content_id = ( - select id from content where sha1 = %s) - """ - cur = self._cursor(cur) - cur.execute(query, (content_id,)) - rows = cur.fetchall() - if not rows: - return None - present = [] - ongoing = {} - for archive, status, mtime in rows: - if status == 'present': - present.append(archive) - elif status == 'ongoing': - ongoing[archive] = mtime - return (content_id, present, ongoing) - - def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): - """Get the list of copies for `limit` contents starting after - `last_content`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - - vars = { - 'limit': limit, - } - - if last_content is None: - last_content_clause = 'true' - else: - last_content_clause = """content_id > ( - select id from content - where sha1 = %(last_content)s)""" - vars['last_content'] = last_content - - query = """select - (select sha1 from content where id = content_id), - array_agg((select name from archive - where id = archive_id)) - from content_copies - where status = 'present' and %s - group by content_id - order by content_id - limit %%(limit)s""" % last_content_clause - - cur = self._cursor(cur) - cur.execute(query, vars) - for content_id, present in cursor_to_bytes(cur): - yield (content_id, present, {}) - - def content_archive_get_unarchived_copies( - self, retention_policy, last_content=None, - limit=1000, cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. Yields only copies with number of present - smaller than `retention policy`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - retention_policy: number of required present copies - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - - vars = { - 'limit': limit, - 'retention_policy': retention_policy, - } - - if last_content is None: - last_content_clause = 'true' - else: - last_content_clause = """content_id > ( - select id from content - where sha1 = %(last_content)s)""" - vars['last_content'] = last_content - - query = """select - (select sha1 from content where id = content_id), - array_agg((select name from archive - where id = archive_id)) - from content_copies - where status = 'present' and %s - group by content_id - having count(archive_id) < %%(retention_policy)s - order by content_id - limit %%(limit)s""" % last_content_clause - - cur = self._cursor(cur) - cur.execute(query, vars) - for content_id, present in cursor_to_bytes(cur): - yield (content_id, present, {}) - - @stored_procedure('swh_mktemp_content_archive') - def mktemp_content_archive(self, cur=None): - """Trigger the creation of the temporary table tmp_content_archive - during the lifetime of the transaction. - - """ - pass - - @stored_procedure('swh_content_archive_add') - def content_archive_add_from_temp(self, cur=None): - """Add new content archive entries from temporary table. - - Use from archiver.storage module: - self.db.mktemp_content_archive() - # copy data over to the temp table - self.db.copy_to([{'colname': id0}, {'colname': id1}], - 'tmp_cache_content', - ['colname'], cur) - # insert into the main table - self.db.add_content_archive_from_temp(cur) - - """ - pass - - def content_archive_get_missing(self, backend_name, cur=None): - """Retrieve the content missing from backend_name. - - """ - cur = self._cursor(cur) - cur.execute("select * from swh_content_archive_missing(%s)", - (backend_name,)) - yield from cursor_to_bytes(cur) - - def content_archive_get_unknown(self, cur=None): - """Retrieve unknown sha1 from archiver db. - - """ - cur = self._cursor(cur) - cur.execute('select * from swh_content_archive_unknown()') - yield from cursor_to_bytes(cur) - - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of an archive content and set its mtime to - - Change the mtime of an archived content for the given archive and set - it's mtime to the current time. - - Args: - content_id (str): content sha1 - archive_id (str): name of the archive - new_status (str): one of 'missing', 'present' or 'ongoing'. - this status will replace the previous one. If not given, - the function only change the mtime of the content for the - given archive. - """ - assert isinstance(content_id, bytes) - assert new_status is not None - - query = """insert into content_copies (archive_id, content_id, status, mtime) - values ((select id from archive where name=%s), - (select id from content where sha1=%s), - %s, %s) - on conflict (archive_id, content_id) do - update set status = excluded.status, mtime = excluded.mtime - """ - - cur = self._cursor(cur) - cur.execute(query, (archive_id, content_id, new_status, utcnow())) diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py deleted file mode 100644 index f707d24c5..000000000 --- a/swh/storage/archiver/director.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import click -import sys - -from swh.core import config, utils -from swh.model import hashutil -from swh.objstorage import get_objstorage -from swh.scheduler.utils import get_task - -from . import tasks # noqa -from .storage import get_archiver_storage - - -class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta): - """Abstract Director class - - An archiver director is in charge of dispatching batch of - contents to archiver workers (for them to archive). - - Inherit from this class and provide: - - ADDITIONAL_CONFIG: Some added configuration needed for the - director to work - - CONFIG_BASE_FILENAME: relative path to lookup for the - configuration file - - def get_contents_to_archive(self): Implementation method to read - contents to archive - - """ - DEFAULT_CONFIG = { - 'batch_max_size': ('int', 1500), - 'asynchronous': ('bool', True), - - 'archiver_storage': ('dict', { - 'cls': 'db', - 'args': { - 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest', - }, - }), - } - - # Destined to be overridden by subclass - ADDITIONAL_CONFIG = {} - - # We use the same configuration file as the worker - CONFIG_BASE_FILENAME = 'archiver/worker' - - # The worker's task queue name to use - TASK_NAME = None - - def __init__(self): - """ Constructor of the archiver director. - - Args: - db_conn_archiver: Either a libpq connection string, - or a psycopg2 connection for the archiver db. - config: optionnal additional configuration. Keys in the dict will - override the one parsed from the configuration file. - """ - super().__init__() - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - self.archiver_storage = get_archiver_storage( - **self.config['archiver_storage']) - self.task = get_task(self.TASK_NAME) - - def run(self): - """ Run the archiver director. - - The archiver director will check all the contents of the archiver - database and do the required backup jobs. - """ - if self.config['asynchronous']: - run_fn = self.run_async_worker - else: - run_fn = self.run_sync_worker - - for batch in self.read_batch_contents(): - run_fn(batch) - - def run_async_worker(self, batch): - """Produce a worker that will be added to the task queue. - - """ - self.task.delay(batch=batch) - - def run_sync_worker(self, batch): - """Run synchronously a worker on the given batch. - - """ - self.task(batch=batch) - - def read_batch_contents(self): - """ Create batch of contents that needs to be archived - - Yields: - batch of sha1 that corresponds to contents that needs more archive - copies. - """ - contents = [] - for content in self.get_contents_to_archive(): - contents.append(content) - if len(contents) > self.config['batch_max_size']: - yield contents - contents = [] - if len(contents) > 0: - yield contents - - @abc.abstractmethod - def get_contents_to_archive(self): - """Retrieve generator of sha1 to archive - - Yields: - sha1 to archive - - """ - pass - - -class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): - """Process the files in order to know which one is needed as backup. - - The archiver director processes the files in the local storage in order - to know which one needs archival and it delegates this task to - archiver workers. - """ - - ADDITIONAL_CONFIG = { - 'retention_policy': ('int', 2), - } - - TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask' - - def get_contents_to_archive(self): - """Create batch of contents that needs to be archived - - Yields: - Datas about a content as a tuple - (content_id, present_copies, ongoing_copies) where ongoing_copies - is a dict mapping copy to mtime. - - """ - last_content = None - while True: - archiver_contents = list( - self.archiver_storage.content_archive_get_unarchived_copies( - last_content=last_content, - retention_policy=self.config['retention_policy'], - limit=self.config['batch_max_size'])) - if not archiver_contents: - return - for content_id, _, _ in archiver_contents: - last_content = content_id - yield content_id - - -def read_sha1_from_stdin(): - """Read sha1 from stdin. - - """ - for line in sys.stdin: - sha1 = line.strip() - try: - yield hashutil.hash_to_bytes(sha1) - except Exception: - print("%s is not a valid sha1 hash, continuing" % repr(sha1), - file=sys.stderr) - continue - - -class ArchiverStdinToBackendDirector(ArchiverDirectorBase): - """A cloud archiver director in charge of reading contents and send - them in batch in the cloud. - - The archiver director, in order: - - Reads sha1 to send to a specific backend. - - Checks if those sha1 are known in the archiver. If they are not, - add them - - if the sha1 are missing, they are sent for the worker to archive - - If the flag force_copy is set, this will force the copy to be sent - for archive even though it has already been done. - - """ - ADDITIONAL_CONFIG = { - 'destination': ('str', 'azure'), - 'force_copy': ('bool', False), - 'source': ('str', 'uffizi'), - 'storages': ('list[dict]', - [ - {'host': 'uffizi', - 'cls': 'pathslicing', - 'args': {'root': '/tmp/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}, - {'host': 'banco', - 'cls': 'remote', - 'args': {'base_url': 'http://banco:5003/'}} - ]) - } - - CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' - - TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask' - - def __init__(self): - super().__init__() - self.destination = self.config['destination'] - self.force_copy = self.config['force_copy'] - self.objstorages = { - storage['host']: get_objstorage(storage['cls'], storage['args']) - for storage in self.config.get('storages', []) - } - # Fallback objstorage - self.source = self.config['source'] - - def _add_unknown_content_ids(self, content_ids): - """Check whether some content_id are unknown. - If they are, add them to the archiver db. - - Args: - content_ids: List of dict with one key content_id - - """ - source_objstorage = self.objstorages[self.source] - - self.archiver_storage.content_archive_add( - (h - for h in content_ids - if h in source_objstorage), - sources_present=[self.source]) - - def get_contents_to_archive(self): - gen_content_ids = ( - ids for ids in utils.grouper(read_sha1_from_stdin(), - self.config['batch_max_size'])) - - if self.force_copy: - for content_ids in gen_content_ids: - content_ids = list(content_ids) - - if not content_ids: - continue - - # Add missing entries in archiver table - self._add_unknown_content_ids(content_ids) - - print('Send %s contents to archive' % len(content_ids)) - - for content_id in content_ids: - # force its status to missing - self.archiver_storage.content_archive_update( - content_id, self.destination, 'missing') - yield content_id - - else: - for content_ids in gen_content_ids: - content_ids = list(content_ids) - - # Add missing entries in archiver table - self._add_unknown_content_ids(content_ids) - - # Filter already copied data - content_ids = list( - self.archiver_storage.content_archive_get_missing( - content_ids=content_ids, - backend_name=self.destination)) - - if not content_ids: - continue - - print('Send %s contents to archive' % len(content_ids)) - - for content in content_ids: - yield content - - def run_async_worker(self, batch): - """Produce a worker that will be added to the task queue. - - """ - self.task.delay(destination=self.destination, batch=batch) - - def run_sync_worker(self, batch): - """Run synchronously a worker on the given batch. - - """ - self.task(destination=self.destination, batch=batch) - - -@click.command() -@click.option('--direct', is_flag=True, - help="""The archiver sends content for backup to -one storage.""") -def launch(direct): - if direct: - archiver = ArchiverStdinToBackendDirector() - else: - archiver = ArchiverWithRetentionPolicyDirector() - - archiver.run() - - -if __name__ == '__main__': - launch() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py deleted file mode 100644 index 977105825..000000000 --- a/swh/storage/archiver/storage.py +++ /dev/null @@ -1,361 +0,0 @@ -# Copyright (C) 2016-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import json -import os -import psycopg2 -import time - -from .db import ArchiverDb - -from swh.model import hashutil -from swh.storage.common import db_transaction_generator, db_transaction -from swh.storage.exc import StorageDBError - - -class ArchiverStorage(): - """SWH Archiver storage proxy, encompassing DB - - """ - def __init__(self, dbconn): - """ - Args: - db_conn: either a libpq connection string, or a psycopg2 connection - - """ - try: - if isinstance(dbconn, psycopg2.extensions.connection): - self.db = ArchiverDb(dbconn) - else: - self.db = ArchiverDb.connect(dbconn) - except psycopg2.OperationalError as e: - raise StorageDBError(e) - - @db_transaction_generator - def archive_ls(self, cur=None): - """ Get all the archives registered on the server. - - Yields: - a tuple (server_id, server_url) for each archive server. - """ - yield from self.db.archive_ls(cur) - - @db_transaction - def content_archive_get(self, content_id, cur=None): - """ Get the archival status of a content. - - Retrieve from the database the archival status of the given content - - Args: - content_id: the sha1 of the content - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - """ - return self.db.content_archive_get(content_id, cur) - - @db_transaction_generator - def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from self.db.content_archive_get_copies(last_content, limit, - cur) - - @db_transaction_generator - def content_archive_get_unarchived_copies( - self, retention_policy, last_content=None, - limit=1000, cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. Yields only copies with number of present - smaller than `retention policy`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - retention_policy: number of required present copies - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from self.db.content_archive_get_unarchived_copies( - retention_policy, last_content, limit, cur) - - @db_transaction_generator - def content_archive_get_missing(self, content_ids, backend_name, cur=None): - """Retrieve missing sha1s from source_name. - - Args: - content_ids ([sha1s]): list of sha1s to test - source_name (str): Name of the backend to check for content - - Yields: - missing sha1s from backend_name - - """ - db = self.db - - db.mktemp_content_archive() - - db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) - - for content_id in db.content_archive_get_missing(backend_name, cur): - yield content_id[0] - - @db_transaction_generator - def content_archive_get_unknown(self, content_ids, cur=None): - """Retrieve unknown sha1s from content_archive. - - Args: - content_ids ([sha1s]): list of sha1s to test - - Yields: - Unknown sha1s from content_archive - - """ - db = self.db - - db.mktemp_content_archive() - - db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) - - for content_id in db.content_archive_get_unknown(cur): - yield content_id[0] - - @db_transaction - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of an archive content and set its mtime to now - - Change the mtime of an archived content for the given archive and set - it's mtime to the current time. - - Args: - content_id (str): content sha1 - archive_id (str): name of the archive - new_status (str): one of 'missing', 'present' or 'ongoing'. - this status will replace the previous one. If not given, - the function only change the mtime of the content for the - given archive. - """ - self.db.content_archive_update(content_id, archive_id, new_status, cur) - - @db_transaction - def content_archive_add( - self, content_ids, sources_present, cur=None): - """Insert a new entry in db about content_id. - - Args: - content_ids ([bytes|str]): content identifiers - sources_present ([str]): List of source names where - contents are present - """ - db = self.db - - # Prepare copies dictionary - copies = {} - for source in sources_present: - copies[source] = { - "status": "present", - "mtime": int(time.time()), - } - - copies = json.dumps(copies) - num_present = len(sources_present) - - db.mktemp('content_archive') - db.copy_to( - ({'content_id': id, - 'copies': copies, - 'num_present': num_present} - for id in content_ids), - 'tmp_content_archive', - ['content_id', 'copies', 'num_present'], - cur) - db.content_archive_add_from_temp(cur) - - -class StubArchiverStorage(): - def __init__(self, archives, present, missing, logfile_base): - """ - A stub storage for the archiver that doesn't write to disk - - Args: - - archives: a dictionary mapping archive names to archive URLs - - present: archives where the objects are all considered present - - missing: archives where the objects are all considered missing - - logfile_base: basename for the logfile - """ - self.archives = archives - self.present = set(present) - self.missing = set(missing) - if set(archives) != self.present | self.missing: - raise ValueError("Present and missing archives don't match") - self.logfile_base = logfile_base - self.__logfile = None - - def open_logfile(self): - if self.__logfile: - return - - logfile_name = "%s.%d" % (self.logfile_base, os.getpid()) - self.__logfile = open(logfile_name, 'a') - - def close_logfile(self): - if not self.__logfile: - return - - self.__logfile.close() - self.__logfile = None - - def archive_ls(self, cur=None): - """ Get all the archives registered on the server. - - Yields: - a tuple (server_id, server_url) for each archive server. - """ - yield from self.archives.items() - - def content_archive_get(self, content_id, cur=None): - """ Get the archival status of a content. - - Retrieve from the database the archival status of the given content - - Args: - content_id: the sha1 of the content - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - """ - return (content_id, self.present, {}) - - def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from [] - - def content_archive_get_unarchived_copies(self, retention_policy, - last_content=None, limit=1000, - cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. Yields only copies with number of present - smaller than `retention policy`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - retention_policy: number of required present copies - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from [] - - def content_archive_get_missing(self, content_ids, backend_name, cur=None): - """Retrieve missing sha1s from source_name. - - Args: - content_ids ([sha1s]): list of sha1s to test - source_name (str): Name of the backend to check for content - - Yields: - missing sha1s from backend_name - - """ - if backend_name in self.missing: - yield from content_ids - elif backend_name in self.present: - yield from [] - else: - raise ValueError('Unknown backend `%s`' % backend_name) - - def content_archive_get_unknown(self, content_ids, cur=None): - """Retrieve unknown sha1s from content_archive. - - Args: - content_ids ([sha1s]): list of sha1s to test - - Yields: - Unknown sha1s from content_archive - - """ - yield from [] - - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of an archive content and set its mtime to now - - Change the mtime of an archived content for the given archive and set - it's mtime to the current time. - - Args: - content_id (str): content sha1 - archive_id (str): name of the archive - new_status (str): one of 'missing', 'present' or 'ongoing'. - this status will replace the previous one. If not given, - the function only change the mtime of the content for the - given archive. - """ - if not self.__logfile: - self.open_logfile() - - print(time.time(), archive_id, new_status, - hashutil.hash_to_hex(content_id), file=self.__logfile) - - def content_archive_add( - self, content_ids, sources_present, cur=None): - """Insert a new entry in db about content_id. - - Args: - content_ids ([bytes|str]): content identifiers - sources_present ([str]): List of source names where - contents are present - """ - pass - - -def get_archiver_storage(cls, args): - """Instantiate an archiver database with the proper class and arguments""" - if cls == 'db': - return ArchiverStorage(**args) - elif cls == 'stub': - return StubArchiverStorage(**args) - else: - raise ValueError('Unknown Archiver Storage class `%s`' % cls) diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py deleted file mode 100644 index ccb0a2f63..000000000 --- a/swh/storage/archiver/tasks.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (C) 2015-2016 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.scheduler.task import Task -from .worker import ArchiverWithRetentionPolicyWorker -from .worker import ArchiverToBackendWorker - - -class SWHArchiverWithRetentionPolicyTask(Task): - """Main task that archive a batch of content. - - """ - task_queue = 'swh_storage_archive_worker' - - def run_task(self, *args, **kwargs): - ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() - - -class SWHArchiverToBackendTask(Task): - """Main task that archive a batch of content in the cloud. - - """ - task_queue = 'swh_storage_archive_worker_to_backend' - - def run_task(self, *args, **kwargs): - ArchiverToBackendWorker(*args, **kwargs).run() diff --git a/swh/storage/archiver/updater.py b/swh/storage/archiver/updater.py deleted file mode 100644 index 64cf1cea4..000000000 --- a/swh/storage/archiver/updater.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging - -from swh.journal.client import SWHJournalClient - -from .storage import get_archiver_storage - - -class SWHArchiverContentUpdater(SWHJournalClient): - """Client in charge of updating new contents in the content_archiver - db. - - This is a swh.journal client only dealing with contents. - - """ - CONFIG_BASE_FILENAME = 'archiver/content_updater' - - ADDITIONAL_CONFIG = { - 'archiver_storage': ( - 'dict', { - 'cls': 'db', - 'args': { - 'dbconn': 'dbname=softwareheritage-archiver-dev ' - 'user=guest', - } - }), - 'sources_present': ('list[str]', ['uffizi']) - } - - def __init__(self): - # Only interested in content here so override the configuration - super().__init__(extra_configuration={'object_types': ['content']}) - - self.sources_present = self.config['sources_present'] - - self.archiver_storage = get_archiver_storage( - **self.config['archiver_storage']) - - def process_objects(self, messages): - self.archiver_storage.content_archive_add( - (c[b'sha1'] for c in messages['content']), - self.sources_present) - - -if __name__ == '__main__': - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - - content_updater = SWHArchiverContentUpdater() - content_updater.process() diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py deleted file mode 100644 index e873c34e2..000000000 --- a/swh/storage/archiver/worker.py +++ /dev/null @@ -1,426 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import logging -import random -import time - -from collections import defaultdict -from celery import group - -from swh.core import config, utils -from swh.objstorage import get_objstorage -from swh.objstorage.exc import Error, ObjNotFoundError -from swh.model import hashutil -from swh.scheduler.utils import get_task - -from .storage import get_archiver_storage -from .copier import ArchiverCopier - - -logger = logging.getLogger('archiver.worker') - - -class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta): - """Base archive worker. - - Inherit from this class and override: - - ADDITIONAL_CONFIG: Some added configuration needed for the - director to work - - CONFIG_BASE_FILENAME: relative path to lookup for the - configuration file - - def need_archival(self, content_data): Determine if a content - needs archival or not - - def choose_backup_servers(self, present, missing): Choose - which backup server to send copies to - - """ - DEFAULT_CONFIG = { - 'archiver_storage': ('dict', { - 'cls': 'db', - 'args': { - 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest', - }, - }), - 'storages': ('list[dict]', - [ - {'host': 'uffizi', - 'cls': 'pathslicing', - 'args': {'root': '/tmp/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}, - {'host': 'banco', - 'cls': 'remote', - 'args': {'base_url': 'http://banco:5003/'}} - ]) - } - - ADDITIONAL_CONFIG = {} - - CONFIG_BASE_FILENAME = 'archiver/worker' - - objstorages = {} - - def __init__(self, batch): - super().__init__() - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - self.batch = batch - self.archiver_db = get_archiver_storage( - **self.config['archiver_storage']) - self.objstorages = { - storage['host']: get_objstorage(storage['cls'], storage['args']) - for storage in self.config.get('storages', []) - } - self.set_objstorages = set(self.objstorages) - - def run(self): - """Do the task expected from the archiver worker. - - Process the contents in self.batch, ensure that the elements - still need an archival (using archiver db), and spawn copiers - to copy files in each destination according to the - archiver-worker's policy. - - """ - transfers = defaultdict(list) - for obj_id in self.batch: - # Get dict {'missing': [servers], 'present': [servers]} - # for contents ignoring those who don't need archival. - copies = self.compute_copies(self.set_objstorages, obj_id) - if not copies: # could not happen if using .director module - msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id) - logger.warning(msg) - continue - - if not self.need_archival(copies): - continue - - present = copies.get('present', set()) - missing = copies.get('missing', set()) - if len(present) == 0: - msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id) - logger.critical(msg) - continue - - # Choose servers to be used as srcs and dests. - for src_dest in self.choose_backup_servers(present, missing): - transfers[src_dest].append(obj_id) - - # Then run copiers for each of the required transfers. - contents_copied = [] - for (src, dest), content_ids in transfers.items(): - contents_copied.extend(self.run_copier(src, dest, content_ids)) - - # copy is done, eventually do something else with them - self.copy_finished(contents_copied) - - def compute_copies(self, set_objstorages, content_id): - """From a content_id, return present and missing copies. - - Args: - objstorages (set): objstorage's id name - content_id: the content concerned - - Returns: - A dictionary with the following keys: - - 'present': set of archives where the content is present - - 'missing': set of archives where the content is missing - - 'ongoing': ongoing copies: dict mapping the archive id - with the time the copy supposedly started. - """ - result = self.archiver_db.content_archive_get(content_id) - if not result: - return None - _, present, ongoing = result - set_present = set_objstorages & set(present) - set_ongoing = set_objstorages & set(ongoing) - set_missing = set_objstorages - set_present - set_ongoing - return { - 'present': set_present, - 'missing': set_missing, - 'ongoing': {archive: value - for archive, value in ongoing.items() - if archive in set_ongoing}, - } - - def run_copier(self, source, destination, content_ids): - """Run a copier in order to archive the given contents. - - Upload the given contents from the source to the destination. - If the process fails, the whole content is considered uncopied - and remains 'ongoing', waiting to be rescheduled as there is a - delay. - - Args: - source (str): source storage's identifier - destination (str): destination storage's identifier - content_ids ([sha1]): list of content ids to archive. - - """ - # Check if there are any errors among the contents. - content_status = self.get_contents_error(content_ids, source) - - # Iterates over the error detected. - for content_id, real_status in content_status.items(): - # Remove them from the to-archive list, - # as they cannot be retrieved correctly. - content_ids.remove(content_id) - # Update their status to reflect their real state. - self.archiver_db.content_archive_update( - content_id, archive_id=source, new_status=real_status) - - # Now perform the copy on the remaining contents - ac = ArchiverCopier( - source=self.objstorages[source], - destination=self.objstorages[destination], - content_ids=content_ids) - - if ac.run(): - # Once the archival complete, update the database. - for content_id in content_ids: - self.archiver_db.content_archive_update( - content_id, archive_id=destination, new_status='present') - - return content_ids - return [] - - def copy_finished(self, content_ids): - """Hook to notify the content_ids archive copy is finished. - (This is not an abstract method as this is optional - """ - pass - - def get_contents_error(self, content_ids, source_storage): - """Indicates what is the error associated to a content when needed - - Check the given content on the given storage. If an error is detected, - it will be reported through the returned dict. - - Args: - content_ids ([sha1]): list of content ids to check - source_storage (str): the source storage holding the - contents to check. - - Returns: - a dict that map {content_id -> error_status} for each content_id - with an error. The `error_status` result may be 'missing' or - 'corrupted'. - - """ - content_status = {} - storage = self.objstorages[source_storage] - for content_id in content_ids: - try: - storage.check(content_id) - except Error: - content_status[content_id] = 'corrupted' - logger.error('%s corrupted!' % hashutil.hash_to_hex( - content_id)) - except ObjNotFoundError: - content_status[content_id] = 'missing' - logger.error('%s missing!' % hashutil.hash_to_hex(content_id)) - - return content_status - - @abc.abstractmethod - def need_archival(self, content_data): - """Indicate if the content needs to be archived. - - Args: - content_data (dict): dict that contains two lists 'present' and - 'missing' with copies id corresponding to this status. - - Returns: - True if there is not enough copies, False otherwise. - - """ - pass - - @abc.abstractmethod - def choose_backup_servers(self, present, missing): - """Choose and yield the required amount of couple source/destination - - For each required copy, choose a unique destination server - among the missing copies and a source server among the - presents. - - Args: - present: set of objstorage source name where the content - is present - missing: set of objstorage destination name where the - content is missing - - Yields: - tuple (source (str), destination (src)) for each required copy. - - """ - pass - - -class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker): - """ Do the required backups on a given batch of contents. - - Process the content of a content batch in order to do the needed backups on - the slaves servers. - """ - - ADDITIONAL_CONFIG = { - 'retention_policy': ('int', 2), - 'archival_max_age': ('int', 3600), - } - - def __init__(self, batch): - """ Constructor of the ArchiverWorker class. - - Args: - batch: list of object's sha1 that potentially need archival. - """ - super().__init__(batch) - config = self.config - self.retention_policy = config['retention_policy'] - self.archival_max_age = config['archival_max_age'] - - if len(self.objstorages) < self.retention_policy: - raise ValueError('Retention policy is too high for the number of ' - 'provided servers') - - def need_archival(self, content_data): - """ Indicate if the content need to be archived. - - Args: - content_data (dict): dict that contains two lists 'present' and - 'missing' with copies id corresponding to this status. - Returns: True if there is not enough copies, False otherwise. - """ - nb_presents = len(content_data.get('present', [])) - for copy, mtime in content_data.get('ongoing', {}).items(): - if not self._is_archival_delay_elapsed(mtime): - nb_presents += 1 - return nb_presents < self.retention_policy - - def _is_archival_delay_elapsed(self, start_time): - """ Indicates if the archival delay is elapsed given the start_time - - Args: - start_time (float): time at which the archival started. - - Returns: - True if the archival delay is elasped, False otherwise - """ - elapsed = time.time() - start_time - return elapsed > self.archival_max_age - - def choose_backup_servers(self, present, missing): - """Choose and yield the required amount of couple source/destination - - For each required copy, choose a unique destination server - among the missing copies and a source server among the - presents. - - Each destination server is unique so after archival, the - retention policy requirement will be fulfilled. However, the - source server may be used multiple times. - - Args: - present: set of objstorage source name where the content - is present - missing: set of objstorage destination name where the - content is missing - - Yields: - tuple (source, destination) for each required copy. - - """ - # Transform from set to list to allow random selections - missing = list(missing) - present = list(present) - nb_required = self.retention_policy - len(present) - destinations = random.sample(missing, nb_required) - sources = [random.choice(present) for dest in destinations] - yield from zip(sources, destinations) - - -class ArchiverToBackendWorker(BaseArchiveWorker): - """Worker that sends copies over from a source to another backend. - - Process the content of a content batch from source objstorage to - destination objstorage. - - """ - - CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' - - ADDITIONAL_CONFIG = { - 'next_task': ( - 'dict', { - 'queue': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', - 'batch_size': 10, - } - ) - } - - def __init__(self, destination, batch): - """Constructor of the ArchiverWorkerToBackend class. - - Args: - destination: where to copy the objects from - batch: sha1s to send to destination - - """ - super().__init__(batch) - self.destination = destination - next_task = self.config['next_task'] - if next_task: - destination_queue = next_task['queue'] - self.task_destination = get_task(destination_queue) - self.batch_size = int(next_task['batch_size']) - else: - self.task_destination = self.batch_size = None - - def need_archival(self, content_data): - """Indicate if the content needs to be archived. - - Args: - content_data (dict): dict that contains 3 lists 'present', - 'ongoing' and 'missing' with copies id corresponding to - this status. - - Returns: - True if we need to archive, False otherwise - - """ - return self.destination in content_data.get('missing', {}) - - def choose_backup_servers(self, present, missing): - """The destination is fixed to the destination mentioned. - - The only variable here is the source of information that we - choose randomly in 'present'. - - Args: - present: set of objstorage source name where the content - is present - missing: set of objstorage destination name where the - content is missing - - Yields: - tuple (source, destination) for each required copy. - - """ - yield (random.choice(list(present)), self.destination) - - def copy_finished(self, content_ids): - """Once the copy is finished, we'll send those batch of contents as - done in the destination queue. - - """ - if self.task_destination: - groups = [] - for ids in utils.grouper(content_ids, self.batch_size): - sig_ids = self.task_destination.s(list(ids)) - groups.append(sig_ids) - - group(groups).delay() diff --git a/swh/storage/db.py b/swh/storage/db.py index 651280236..367ccbc6a 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,987 +1,987 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import functools import json import psycopg2 import psycopg2.extras import select import tempfile from contextlib import contextmanager from swh.model import hashutil TMP_CONTENT_TABLE = 'tmp_content' psycopg2.extras.register_uuid() def stored_procedure(stored_proc): """decorator to execute remote stored procedure, specified as argument Generally, the body of the decorated function should be empty. If it is not, the stored procedure will be executed first; the function body then. """ def wrap(meth): @functools.wraps(meth) def _meth(self, *args, **kwargs): cur = kwargs.get('cur', None) self._cursor(cur).execute('SELECT %s()' % stored_proc) meth(self, *args, **kwargs) return _meth return wrap def jsonize(value): """Convert a value to a psycopg2 JSON object if necessary""" if isinstance(value, dict): return psycopg2.extras.Json(value) return value def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() if isinstance(entry, list): return [entry_to_bytes(value) for value in entry] return entry def line_to_bytes(line): """Convert a line coming from the database to bytes""" if not line: return line if isinstance(line, dict): return {k: entry_to_bytes(v) for k, v in line.items()} return line.__class__(entry_to_bytes(entry) for entry in line) def cursor_to_bytes(cursor): """Yield all the data from a cursor as bytes""" yield from (line_to_bytes(line) for line in cursor) class BaseDb: """Base class for swh.storage.*Db. - cf. swh.storage.db.Db, swh.storage.archiver.db.ArchiverDb + cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb """ @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) return cls(conn) def _cursor(self, cur_arg): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg # elif self.cur is not None: # return self.cur else: return self.conn.cursor() def __init__(self, conn): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB """ self.conn = conn @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except: if not self.conn.closed: self.conn.rollback() raise def copy_to(self, items, tblname, columns, cur=None, item_cb=None): """Copy items' entries to table tblname with columns information. Args: items (dict): dictionary of data to copy over tblname tblname (str): Destination table's name columns ([str]): keys to access data in items and also the column names in the destination table. item_cb (fn): optional function to apply to items's entry """ def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) def mktemp(self, tblname, cur=None): self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) class Db(BaseDb): """Proxy to the SWH DB, with wrappers around stored procedures """ def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)', (('directory_entry_%s' % entry_type),)) @stored_procedure('swh_mktemp_revision') def mktemp_revision(self, cur=None): pass @stored_procedure('swh_mktemp_release') def mktemp_release(self, cur=None): pass @stored_procedure('swh_mktemp_occurrence_history') def mktemp_occurrence_history(self, cur=None): pass @stored_procedure('swh_mktemp_entity_lister') def mktemp_entity_lister(self, cur=None): pass @stored_procedure('swh_mktemp_entity_history') def mktemp_entity_history(self, cur=None): pass @stored_procedure('swh_mktemp_bytea') def mktemp_bytea(self, cur=None): pass @stored_procedure('swh_mktemp_content_ctags') def mktemp_content_ctags(self, cur=None): pass @stored_procedure('swh_mktemp_content_ctags_missing') def mktemp_content_ctags_missing(self, cur=None): pass def register_listener(self, notify_queue, cur=None): """Register a listener for NOTIFY queue `notify_queue`""" self._cursor(cur).execute("LISTEN %s" % notify_queue) def listen_notifies(self, timeout): """Listen to notifications for `timeout` seconds""" if select.select([self.conn], [], [], timeout) == ([], [], []): return else: self.conn.poll() while self.conn.notifies: yield self.conn.notifies.pop(0) @stored_procedure('swh_content_add') def content_add_from_temp(self, cur=None): pass @stored_procedure('swh_directory_add') def directory_add_from_temp(self, cur=None): pass @stored_procedure('swh_skipped_content_add') def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure('swh_revision_add') def revision_add_from_temp(self, cur=None): pass @stored_procedure('swh_release_add') def release_add_from_temp(self, cur=None): pass @stored_procedure('swh_occurrence_history_add') def occurrence_history_add_from_temp(self, cur=None): pass @stored_procedure('swh_entity_history_add') def entity_history_add_from_temp(self, cur=None): pass @stored_procedure('swh_cache_content_revision_add') def cache_content_revision_add(self, cur=None): pass def store_tmp_bytea(self, ids, cur=None): """Store the given identifiers in a new tmp_bytea table""" cur = self._cursor(cur) self.mktemp_bytea(cur) self.copy_to(({'id': elem} for elem in ids), 'tmp_bytea', ['id'], cur) def content_update_from_temp(self, keys_to_update, cur=None): cur = self._cursor(cur) cur.execute("""select swh_content_update(ARRAY[%s] :: text[])""" % keys_to_update) content_get_metadata_keys = [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status'] skipped_content_keys = [ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'reason', 'status', 'origin'] def content_get_metadata_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""select t.id as sha1, %s from tmp_bytea t left join content on t.id = content.sha1 """ % ', '.join(self.content_get_metadata_keys[1:])) yield from cursor_to_bytes(cur) def content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 FROM swh_content_missing()""") yield from cursor_to_bytes(cur) def content_missing_per_sha1_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT * FROM swh_content_missing_per_sha1()""") yield from cursor_to_bytes(cur) def skipped_content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 FROM swh_skipped_content_missing()""") yield from cursor_to_bytes(cur) def occurrence_get(self, origin_id, cur=None): """Retrieve latest occurrence's information by origin_id. """ cur = self._cursor(cur) cur.execute("""SELECT origin, branch, target, target_type, (select max(date) from origin_visit where origin=%s) as date FROM occurrence WHERE origin=%s """, (origin_id, origin_id)) yield from cursor_to_bytes(cur) content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status'] def content_find(self, sha1=None, sha1_git=None, sha256=None, blake2s256=None, cur=None): """Find the content optionally on a combination of the following checksums sha1, sha1_git, sha256 or blake2s256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content blake2s256: blake2s256 content Returns: The tuple (sha1, sha1_git, sha256, blake2s256) if found or None. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM swh_content_find(%%s, %%s, %%s, %%s) LIMIT 1""" % ','.join(self.content_find_cols), (sha1, sha1_git, sha256, blake2s256)) content = line_to_bytes(cur.fetchone()) if set(content) == {None}: return None else: return content provenance_cols = ['content', 'revision', 'origin', 'visit', 'path'] def content_find_provenance(self, sha1_git, cur=None): """Find content's provenance information Args: sha1: sha1_git content cur: cursor to use Returns: Provenance information on such content """ cur = self._cursor(cur) cur.execute("""SELECT content, revision, origin, visit, path FROM swh_content_find_provenance(%s)""", (sha1_git, )) yield from cursor_to_bytes(cur) def directory_get_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('''SELECT id, file_entries, dir_entries, rev_entries FROM swh_directory_get()''') yield from cursor_to_bytes(cur) def directory_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_missing()') yield from cursor_to_bytes(cur) directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256', 'length'] def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cols = ', '.join(self.directory_ls_cols) query = 'SELECT %s FROM swh_directory_walk_one(%%s)' % cols cur.execute(query, (directory,)) yield from cursor_to_bytes(cur) def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cols = ', '.join(self.directory_ls_cols) query = 'SELECT %s FROM swh_directory_walk(%%s)' % cols cur.execute(query, (directory,)) yield from cursor_to_bytes(cur) def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cols = ', '.join(self.directory_ls_cols) query = ( 'SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)' % cols) cur.execute(query, (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return line_to_bytes(data) def revision_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_revision_missing() as r(id)') yield from cursor_to_bytes(cur) revision_add_cols = [ 'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', 'committer_date_offset', 'committer_date_neg_utc_offset', 'type', 'directory', 'message', 'author_fullname', 'author_name', 'author_email', 'committer_fullname', 'committer_name', 'committer_email', 'metadata', 'synthetic', ] revision_get_cols = revision_add_cols + [ 'author_id', 'committer_id', 'parents'] def origin_visit_add(self, origin, ts, cur=None): """Add a new origin_visit for origin origin at timestamp ts with status 'ongoing'. Args: origin: origin concerned by the visit ts: the date of the visit Returns: The new visit index step for that origin """ cur = self._cursor(cur) self._cursor(cur).execute('SELECT swh_origin_visit_add(%s, %s)', (origin, ts)) return cur.fetchone()[0] def origin_visit_update(self, origin, visit_id, status, metadata, cur=None): """Update origin_visit's status.""" cur = self._cursor(cur) update = """UPDATE origin_visit SET status=%s, metadata=%s WHERE origin=%s AND visit=%s""" cur.execute(update, (status, jsonize(metadata), origin, visit_id)) origin_visit_get_cols = ['origin', 'visit', 'date', 'status', 'metadata'] def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): """Retrieve all visits for origin with id origin_id. Args: origin_id: The occurrence's origin Yields: The occurrence's history visits """ cur = self._cursor(cur) query_suffix = '' if last_visit: query_suffix += ' AND %s < visit' % last_visit if limit: query_suffix += ' LIMIT %s' % limit query = """\ SELECT %s FROM origin_visit WHERE origin=%%s %s""" % ( ', '.join(self.origin_visit_get_cols), query_suffix) cur.execute(query, (origin_id, )) yield from cursor_to_bytes(cur) def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. Args: origin_id: the origin concerned visit_id: The visit step for that origin Returns: The origin_visit information """ cur = self._cursor(cur) query = """\ SELECT %s FROM origin_visit WHERE origin = %%s AND visit = %%s """ % (', '.join(self.origin_visit_get_cols)) cur.execute(query, (origin_id, visit_id)) r = cur.fetchall() if not r: return None return line_to_bytes(r[0]) occurrence_cols = ['origin', 'branch', 'target', 'target_type'] def occurrence_by_origin_visit(self, origin_id, visit_id, cur=None): """Retrieve all occurrences for a particular origin_visit. Args: origin_id: the origin concerned visit_id: The visit step for that origin Yields: The occurrence's history visits """ cur = self._cursor(cur) query = """\ SELECT %s FROM swh_occurrence_by_origin_visit(%%s, %%s) """ % (', '.join(self.occurrence_cols)) cur.execute(query, (origin_id, visit_id)) yield from cursor_to_bytes(cur) def revision_get_from_temp(self, cur=None): cur = self._cursor(cur) query = 'SELECT %s FROM swh_revision_get()' % ( ', '.join(self.revision_get_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ', '.join(self.revision_get_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) revision_shortlog_cols = ['id', 'parents'] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ', '.join(self.revision_shortlog_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) cache_content_get_cols = [ 'sha1', 'sha1_git', 'sha256', 'revision_paths'] def cache_content_get_all(self, cur=None): """Retrieve cache contents' sha1, sha256, sha1_git """ cur = self._cursor(cur) cur.execute('SELECT * FROM swh_cache_content_get_all()') yield from cursor_to_bytes(cur) def cache_content_get(self, sha1_git, cur=None): """Retrieve cache content information sh. """ cur = self._cursor(cur) cur.execute('SELECT * FROM swh_cache_content_get(%s)', (sha1_git, )) data = cur.fetchone() if data: return line_to_bytes(data) return None def cache_revision_origin_add(self, origin, visit, cur=None): """Populate the content provenance information cache for the given (origin, visit) couple.""" cur = self._cursor(cur) cur.execute('SELECT * FROM swh_cache_revision_origin_add(%s, %s)', (origin, visit)) yield from cursor_to_bytes(cur) def release_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_release_missing() as r(id)') yield from cursor_to_bytes(cur) object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) self.store_tmp_bytea(ids, cur) query = 'select %s from swh_object_find_by_sha1_git()' % ( ', '.join(self.object_find_by_sha1_git_cols) ) cur.execute(query) yield from cursor_to_bytes(cur) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_stat_counters()') yield from cur fetch_history_cols = ['origin', 'date', 'status', 'result', 'stdout', 'stderr', 'duration'] def create_fetch_history(self, fetch_history, cur=None): """Create a fetch_history entry with the data in fetch_history""" cur = self._cursor(cur) query = '''INSERT INTO fetch_history (%s) VALUES (%s) RETURNING id''' % ( ','.join(self.fetch_history_cols), ','.join(['%s'] * len(self.fetch_history_cols)) ) cur.execute(query, [fetch_history.get(col) for col in self.fetch_history_cols]) return cur.fetchone()[0] def get_fetch_history(self, fetch_history_id, cur=None): """Get a fetch_history entry with the given id""" cur = self._cursor(cur) query = '''SELECT %s FROM fetch_history WHERE id=%%s''' % ( ', '.join(self.fetch_history_cols), ) cur.execute(query, (fetch_history_id,)) data = cur.fetchone() if not data: return None ret = {'id': fetch_history_id} for i, col in enumerate(self.fetch_history_cols): ret[col] = data[i] return ret def update_fetch_history(self, fetch_history, cur=None): """Update the fetch_history entry from the data in fetch_history""" cur = self._cursor(cur) query = '''UPDATE fetch_history SET %s WHERE id=%%s''' % ( ','.join('%s=%%s' % col for col in self.fetch_history_cols) ) cur.execute(query, [jsonize(fetch_history.get(col)) for col in self.fetch_history_cols + ['id']]) base_entity_cols = ['uuid', 'parent', 'name', 'type', 'description', 'homepage', 'active', 'generated', 'lister_metadata', 'metadata'] entity_cols = base_entity_cols + ['last_seen', 'last_id'] entity_history_cols = base_entity_cols + ['id', 'validity'] def origin_add(self, type, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (type, url) values (%s, %s) RETURNING id""" cur.execute(insert, (type, url)) return cur.fetchone()[0] def origin_get_with(self, type, url, cur=None): """Retrieve the origin id from its type and url if found.""" cur = self._cursor(cur) query = """SELECT id, type, url, lister, project FROM origin WHERE type=%s AND url=%s""" cur.execute(query, (type, url)) data = cur.fetchone() if data: return line_to_bytes(data) return None def origin_get(self, id, cur=None): """Retrieve the origin per its identifier. """ cur = self._cursor(cur) query = "SELECT id, type, url, lister, project FROM origin WHERE id=%s" cur.execute(query, (id,)) data = cur.fetchone() if data: return line_to_bytes(data) return None person_cols = ['fullname', 'name', 'email'] person_get_cols = person_cols + ['id'] def person_add(self, person, cur=None): """Add a person identified by its name and email. Returns: The new person's id """ cur = self._cursor(cur) query_new_person = '''\ INSERT INTO person(%s) VALUES (%s) RETURNING id''' % ( ', '.join(self.person_cols), ', '.join('%s' for i in range(len(self.person_cols))) ) cur.execute(query_new_person, [person[col] for col in self.person_cols]) return cur.fetchone()[0] def person_get(self, ids, cur=None): """Retrieve the persons identified by the list of ids. """ cur = self._cursor(cur) query = """SELECT %s FROM person WHERE id IN %%s""" % ', '.join(self.person_get_cols) cur.execute(query, (tuple(ids),)) yield from cursor_to_bytes(cur) release_add_cols = [ 'id', 'target', 'target_type', 'date', 'date_offset', 'date_neg_utc_offset', 'name', 'comment', 'synthetic', 'author_fullname', 'author_name', 'author_email', ] release_get_cols = release_add_cols + ['author_id'] def release_get_from_temp(self, cur=None): cur = self._cursor(cur) query = ''' SELECT %s FROM swh_release_get() ''' % ', '.join(self.release_get_cols) cur.execute(query) yield from cursor_to_bytes(cur) def release_get_by(self, origin_id, limit=None, cur=None): """Retrieve a release by occurrence criterion (only origin right now) Args: - origin_id: The origin to look for. """ cur = self._cursor(cur) query = """ SELECT %s FROM swh_release_get_by(%%s) LIMIT %%s """ % ', '.join(self.release_get_cols) cur.execute(query, (origin_id, limit)) yield from cursor_to_bytes(cur) def revision_get_by(self, origin_id, branch_name, datetime, limit=None, cur=None): """Retrieve a revision by occurrence criterion. Args: - origin_id: The origin to look for - branch_name: the branch name to look for - datetime: the lower bound of timerange to look for. - limit: limit number of results to return The upper bound being now. """ cur = self._cursor(cur) if branch_name and isinstance(branch_name, str): branch_name = branch_name.encode('utf-8') query = ''' SELECT %s FROM swh_revision_get_by(%%s, %%s, %%s) LIMIT %%s ''' % ', '.join(self.revision_get_cols) cur.execute(query, (origin_id, branch_name, datetime, limit)) yield from cursor_to_bytes(cur) def entity_get(self, uuid, cur=None): """Retrieve the entity and its parent hierarchy chain per uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM swh_entity_get(%%s)""" % ( ', '.join(self.entity_cols)), (uuid, )) yield from cursor_to_bytes(cur) def entity_get_one(self, uuid, cur=None): """Retrieve a single entity given its uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM entity WHERE uuid = %%s""" % ( ', '.join(self.entity_cols)), (uuid, )) data = cur.fetchone() if not data: return None return line_to_bytes(data) content_mimetype_cols = [ 'id', 'mimetype', 'encoding', 'tool_id', 'tool_name', 'tool_version', 'tool_configuration'] @stored_procedure('swh_mktemp_content_mimetype_missing') def mktemp_content_mimetype_missing(self, cur=None): pass def content_mimetype_missing_from_temp(self, cur=None): """List missing mimetypes. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_mimetype_missing()") yield from cursor_to_bytes(cur) @stored_procedure('swh_mktemp_content_mimetype') def mktemp_content_mimetype(self, cur=None): pass def content_mimetype_add_from_temp(self, conflict_update, cur=None): self._cursor(cur).execute("SELECT swh_content_mimetype_add(%s)", (conflict_update, )) def content_mimetype_get_from_temp(self, cur=None): cur = self._cursor(cur) query = "SELECT %s FROM swh_content_mimetype_get()" % ( ','.join(self.content_mimetype_cols)) cur.execute(query) yield from cursor_to_bytes(cur) content_language_cols = [ 'id', 'lang', 'tool_id', 'tool_name', 'tool_version', 'tool_configuration'] @stored_procedure('swh_mktemp_content_language') def mktemp_content_language(self, cur=None): pass @stored_procedure('swh_mktemp_content_language_missing') def mktemp_content_language_missing(self, cur=None): pass def content_language_missing_from_temp(self, cur=None): """List missing languages. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_language_missing()") yield from cursor_to_bytes(cur) def content_language_add_from_temp(self, conflict_update, cur=None): self._cursor(cur).execute("SELECT swh_content_language_add(%s)", (conflict_update, )) def content_language_get_from_temp(self, cur=None): cur = self._cursor(cur) query = "SELECT %s FROM swh_content_language_get()" % ( ','.join(self.content_language_cols)) cur.execute(query) yield from cursor_to_bytes(cur) content_ctags_cols = [ 'id', 'name', 'kind', 'line', 'lang', 'tool_id', 'tool_name', 'tool_version', 'tool_configuration'] def content_ctags_missing_from_temp(self, cur=None): """List missing ctags. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_ctags_missing()") yield from cursor_to_bytes(cur) def content_ctags_add_from_temp(self, conflict_update, cur=None): self._cursor(cur).execute("SELECT swh_content_ctags_add(%s)", (conflict_update, )) def content_ctags_get_from_temp(self, cur=None): cur = self._cursor(cur) query = "SELECT %s FROM swh_content_ctags_get()" % ( ','.join(self.content_ctags_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def content_ctags_search(self, expression, last_sha1, limit, cur=None): cur = self._cursor(cur) if not last_sha1: query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s)""" % ( ','.join(self.content_ctags_cols)) cur.execute(query, (expression, limit)) else: if last_sha1 and isinstance(last_sha1, bytes): last_sha1 = '\\x%s' % hashutil.hash_to_hex(last_sha1) elif last_sha1: last_sha1 = '\\x%s' % last_sha1 query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s, %%s)""" % ( ','.join(self.content_ctags_cols)) cur.execute(query, (expression, limit, last_sha1)) yield from cursor_to_bytes(cur) content_fossology_license_cols = [ 'id', 'tool_id', 'tool_name', 'tool_version', 'tool_configuration', 'licenses'] @stored_procedure('swh_mktemp_content_fossology_license') def mktemp_content_fossology_license(self, cur=None): pass def content_fossology_license_add_from_temp(self, conflict_update, cur=None): """Add new licenses per content. """ self._cursor(cur).execute( "SELECT swh_content_fossology_license_add(%s)", (conflict_update, )) def content_fossology_license_get_from_temp(self, cur=None): """Retrieve licenses per content. """ cur = self._cursor(cur) query = "SELECT %s FROM swh_content_fossology_license_get()" % ( ','.join(self.content_fossology_license_cols)) cur.execute(query) yield from cursor_to_bytes(cur) indexer_configuration_cols = ['id', 'tool_name', 'tool_version', 'tool_configuration'] def indexer_configuration_get(self, tool_name, tool_version, tool_configuration, cur=None): cur = self._cursor(cur) cur.execute('''select %s from indexer_configuration where tool_name=%%s and tool_version=%%s and tool_configuration=%%s''' % ( ','.join(self.indexer_configuration_cols)), (tool_name, tool_version, tool_configuration)) data = cur.fetchone() if not data: return None return line_to_bytes(data) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py deleted file mode 100644 index d48d83d33..000000000 --- a/swh/storage/tests/test_archiver.py +++ /dev/null @@ -1,486 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import glob -import tempfile -import shutil -import unittest -import os -import time - -from nose.tools import istest -from nose.plugins.attrib import attr - -from swh.core.tests.db_testing import DbsTestFixture - -from swh.storage.archiver.storage import get_archiver_storage - -from swh.storage.archiver import ArchiverWithRetentionPolicyDirector -from swh.storage.archiver import ArchiverWithRetentionPolicyWorker -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError - -try: - # objstorage > 0.17 - from swh.objstorage.api.server import make_app as app - from server_testing import ServerTestFixtureAsync as ServerTestFixture - MIGRATED = True -except ImportError: - # objstorage <= 0.17 - from swh.objstorage.api.server import app - from server_testing import ServerTestFixture - MIGRATED = False - -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') - - -@attr('db') -class TestArchiver(DbsTestFixture, ServerTestFixture, - unittest.TestCase): - """ Test the objstorage archiver. - """ - - TEST_DB_NAMES = [ - 'softwareheritage-archiver-test', - ] - TEST_DB_DUMPS = [ - os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), - ] - TEST_DB_DUMP_TYPES = [ - 'pg_dump', - ] - - def setUp(self): - # Launch the backup server - self.dest_root = tempfile.mkdtemp(prefix='remote') - self.config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6', - } - } - if MIGRATED: - self.app = app(self.config) - else: - self.app = app - super().setUp() - - # Retrieve connection (depends on the order in TEST_DB_NAMES) - self.conn = self.conns[0] # archiver db's connection - self.cursor = self.cursors[0] - - # Create source storage - self.src_root = tempfile.mkdtemp() - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - # Create destination storage - dest_config = { - 'cls': 'remote', - 'args': { - 'url': self.url() - } - } - self.dest_storage = get_objstorage(**dest_config) - - # Keep mapped the id to the storages - self.storages = { - 'uffizi': self.src_storage, - 'banco': self.dest_storage - } - - # Override configurations - src_archiver_conf = {'host': 'uffizi'} - dest_archiver_conf = {'host': 'banco'} - src_archiver_conf.update(src_config) - dest_archiver_conf.update(dest_config) - self.archiver_storages = [src_archiver_conf, dest_archiver_conf] - self._override_director_config() - self._override_worker_config() - # Create the base archiver - self.archiver = self._create_director() - - def tearDown(self): - self.empty_tables() - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - super().tearDown() - - def empty_tables(self): - # Remove all content - self.cursor.execute('DELETE FROM content') - self.cursor.execute('DELETE FROM content_copies') - self.conn.commit() - - def _override_director_config(self, retention_policy=2): - """ Override the default config of the Archiver director - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'batch_max_size': 5000, - 'archival_max_age': 3600, - 'retention_policy': retention_policy, - 'asynchronous': False, - } - - def _override_worker_config(self): - """ Override the default config of the Archiver worker - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa - 'retention_policy': 2, - 'archival_max_age': 3600, - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'storages': self.archiver_storages, - 'source': 'uffizi', - } - - def _create_director(self): - return ArchiverWithRetentionPolicyDirector() - - def _create_worker(self, batch={}): - return ArchiverWithRetentionPolicyWorker(batch) - - def _add_content(self, storage_name, content_data): - """ Add really a content to the given objstorage - - This put an empty status for the added content. - - Args: - storage_name: the concerned storage - content_data: the data to insert - with_row_insert: to insert a row entry in the db or not - - """ - # Add the content to the storage - obj_id = self.storages[storage_name].add(content_data) - self.cursor.execute(""" INSERT INTO content (sha1) - VALUES (%s) - """, (obj_id,)) - return obj_id - - def _update_status(self, obj_id, storage_name, status, date=None): - """ Update the db status for the given id/storage_name. - - This does not create the content in the storage. - """ - self.cursor.execute("""insert into archive (name) - values (%s) - on conflict do nothing""", (storage_name,)) - - self.archiver.archiver_storage.content_archive_update( - obj_id, storage_name, status - ) - - # Integration test - @istest - def archive_missing_content(self): - """ Run archiver on a missing content should archive it. - """ - obj_data = b'archive_missing_content' - obj_id = self._add_content('uffizi', obj_data) - self._update_status(obj_id, 'uffizi', 'present') - # Content is missing on banco (entry not present in the db) - try: - self.dest_storage.get(obj_id) - except ObjNotFoundError: - pass - else: - self.fail('Content should not be present before archival') - self.archiver.run() - # now the content should be present on remote objstorage - remote_data = self.dest_storage.get(obj_id) - self.assertEquals(obj_data, remote_data) - - @istest - def archive_present_content(self): - """ A content that is not 'missing' shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_present_content') - self._update_status(obj_id, 'uffizi', 'present') - self._update_status(obj_id, 'banco', 'present') - # After the run, the content should NOT be in the archive. - # As the archiver believe it was already in. - self.archiver.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - @istest - def archive_already_enough(self): - """ A content missing with enough copies shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self._override_director_config(retention_policy=1) - director = self._create_director() - # Obj is present in only one archive but only one copy is required. - director.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - @istest - def content_archive_get_copies(self): - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [], - ) - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [(obj_id, ['uffizi'], {})], - ) - - # Unit tests for archive worker - - def archival_elapsed(self, mtime): - return self._create_worker()._is_archival_delay_elapsed(mtime) - - @istest - def vstatus_ongoing_remaining(self): - self.assertFalse(self.archival_elapsed(time.time())) - - @istest - def vstatus_ongoing_elapsed(self): - past_time = ( - time.time() - self._create_worker().archival_max_age - ) - self.assertTrue(self.archival_elapsed(past_time)) - - def _status(self, status, mtime=None): - """ Get a dict that match the copies structure - """ - return {'status': status, 'mtime': mtime or time.time()} - - @istest - def need_archival_missing(self): - """ A content should need archival when it is missing. - """ - status_copies = {'present': ['uffizi'], 'missing': ['banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - True) - - @istest - def need_archival_present(self): - """ A content present everywhere shouldn't need archival - """ - status_copies = {'present': ['uffizi', 'banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - False) - - def _compute_copies_status(self, status): - """ A content with a given status should be detected correctly - """ - obj_id = self._add_content( - 'banco', b'compute_copies_' + bytes(status, 'utf8')) - self._update_status(obj_id, 'banco', status) - worker = self._create_worker() - self.assertIn('banco', worker.compute_copies( - set(worker.objstorages), obj_id)[status]) - - @istest - def compute_copies_present(self): - """ A present content should be detected with correct status - """ - self._compute_copies_status('present') - - @istest - def compute_copies_missing(self): - """ A missing content should be detected with correct status - """ - self._compute_copies_status('missing') - - @istest - def compute_copies_extra_archive(self): - obj_id = self._add_content('banco', b'foobar') - self._update_status(obj_id, 'banco', 'present') - self._update_status(obj_id, 'random_archive', 'present') - worker = self._create_worker() - copies = worker.compute_copies(set(worker.objstorages), obj_id) - self.assertEqual(copies['present'], {'banco'}) - self.assertEqual(copies['missing'], {'uffizi'}) - - def _get_backups(self, present, missing): - """ Return a list of the pair src/dest from the present and missing - """ - worker = self._create_worker() - return list(worker.choose_backup_servers(present, missing)) - - @istest - def choose_backup_servers(self): - self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) - self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) - # Even with more possible destinations, do not take more than the - # retention_policy require - self.assertEqual( - len(self._get_backups(['uffizi'], ['banco', 's3'])), - 1 - ) - - -class TestArchiverStorageStub(unittest.TestCase): - def setUp(self): - self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') - self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') - self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') - - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - # Create destination storage - dest_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.dest_storage = get_objstorage(**dest_config) - - self.config = { - 'cls': 'stub', - 'args': { - 'archives': { - 'present_archive': 'http://uffizi:5003', - 'missing_archive': 'http://banco:5003', - }, - 'present': ['present_archive'], - 'missing': ['missing_archive'], - 'logfile_base': os.path.join(self.log_root, 'log_'), - } - } - - # Generated with: - # - # id_length = 20 - # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') - # - self.content_ids = [ - b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", - b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', - b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', - b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', - b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', - ] - - self.archiver_storage = get_archiver_storage(**self.config) - super().setUp() - - def tearDown(self): - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - shutil.rmtree(self.log_root) - super().tearDown() - - @istest - def archive_ls(self): - self.assertCountEqual( - self.archiver_storage.archive_ls(), - self.config['args']['archives'].items() - ) - - @istest - def content_archive_get(self): - for content_id in self.content_ids: - self.assertEqual( - self.archiver_storage.content_archive_get(content_id), - (content_id, set(self.config['args']['present']), {}), - ) - - @istest - def content_archive_get_copies(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_copies(), - [], - ) - - @istest - def content_archive_get_unarchived_copies(self): - retention_policy = 2 - self.assertCountEqual( - self.archiver_storage.content_archive_get_unarchived_copies( - retention_policy), - [], - ) - - @istest - def content_archive_get_missing(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'missing_archive' - ), - self.content_ids, - ) - - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'present_archive' - ), - [], - ) - - with self.assertRaises(ValueError): - list(self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'unknown_archive' - )) - - @istest - def content_archive_get_unknown(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_unknown( - self.content_ids, - ), - [], - ) - - @istest - def content_archive_update(self): - for content_id in self.content_ids: - self.archiver_storage.content_archive_update( - content_id, 'present_archive', 'present') - self.archiver_storage.content_archive_update( - content_id, 'missing_archive', 'present') - - self.archiver_storage.close_logfile() - - # Make sure we created a logfile - files = glob.glob('%s*' % self.config['args']['logfile_base']) - self.assertEqual(len(files), 1) - - # make sure the logfile contains all our lines - lines = open(files[0]).readlines() - self.assertEqual(len(lines), 2 * len(self.content_ids)) diff --git a/version.txt b/version.txt index 91ab8c10f..dfa2863f6 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.87-0-g9c8455e \ No newline at end of file +v0.0.88-0-gb8c480f \ No newline at end of file