diff --git a/debian/control b/debian/control index 18b99f504..d3dfcc1f6 100644 --- a/debian/control +++ b/debian/control @@ -1,57 +1,48 @@ Source: swh-storage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-click, python3-dateutil, python3-flask, python3-nose, python3-psycopg2, python3-requests, python3-setuptools, python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), python3-swh.scheduler (>= 0.0.14~), python3-aiohttp, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ Package: python3-swh.storage Architecture: all Depends: python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage utilities Package: python3-swh.storage.listener Architecture: all Depends: python3-swh.journal (>= 0.0.2~), python3-kafka (>= 1.3.1~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage listener -Package: python3-swh.storage.archiver -Architecture: all -Depends: python3-swh.scheduler (>= 0.0.14~), - python3-swh.journal, - python3-swh.storage (= ${binary:Version}), - ${misc:Depends}, - ${python3:Depends} -Description: Software Heritage storage Archiver - Package: python3-swh.storage.provenance Architecture: all Depends: python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Provenance diff --git a/debian/rules b/debian/rules index 5dbaab1b4..cf9189bd7 100755 --- a/debian/rules +++ b/debian/rules @@ -1,26 +1,23 @@ #!/usr/bin/make -f export PYBUILD_NAME=swh.storage %: dh $@ --with python3 --buildsystem=pybuild override_dh_install: dh_install for pyvers in $(shell py3versions -vr); do \ mkdir -p $(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \ mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/listener.py \ $(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \ - mkdir -p $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver ; \ - mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/* \ - $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/ ; \ mkdir -p $(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance ; \ mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/* \ $(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/ ; \ done override_dh_auto_test: PYBUILD_SYSTEM=custom \ PYBUILD_TEST_ARGS="cd {build_dir}; python{version} -m nose swh -sva '!db'" \ dh_auto_test diff --git a/sql/archiver/Makefile b/sql/archiver/Makefile deleted file mode 100644 index c132dbcce..000000000 --- a/sql/archiver/Makefile +++ /dev/null @@ -1,42 +0,0 @@ -# Depends: postgresql-client, postgresql-autodoc - -DBNAME = softwareheritage-archiver-dev -DOCDIR = autodoc - -SQL_INIT = ../swh-init.sql -SQL_SCHEMA = swh-archiver-schema.sql -SQL_FUNC = swh-archiver-func.sql -SQL_DATA = swh-archiver-data.sql -SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA) - -PSQL_BIN = psql -PSQL_FLAGS = --single-transaction --echo-all -X -PSQL = $(PSQL_BIN) $(PSQL_FLAGS) - - -all: - -createdb: createdb-stamp -createdb-stamp: $(SQL_INIT) - createdb $(DBNAME) - touch $@ - -filldb: filldb-stamp -filldb-stamp: createdb-stamp - cat $(SQLS) | $(PSQL) $(DBNAME) - touch $@ - -dropdb: - -dropdb $(DBNAME) - -dumpdb: swh-archiver.dump -swh.dump: filldb-stamp - pg_dump -Fc $(DBNAME) > $@ - -clean: - rm -rf *-stamp $(DOCDIR)/ - -distclean: clean dropdb - rm -f swh.dump - -.PHONY: all initdb createdb dropdb doc clean diff --git a/sql/archiver/swh-archiver-data.sql b/sql/archiver/swh-archiver-data.sql deleted file mode 100644 index e4a70a25e..000000000 --- a/sql/archiver/swh-archiver-data.sql +++ /dev/null @@ -1,3 +0,0 @@ -INSERT INTO archive(name) VALUES('uffizi'); -INSERT INTO archive(name) VALUES('banco'); -INSERT INTO archive(name) VALUES('azure'); diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql deleted file mode 100644 index 750bfae2c..000000000 --- a/sql/archiver/swh-archiver-func.sql +++ /dev/null @@ -1,40 +0,0 @@ -create or replace function swh_mktemp_content() - returns void - language plpgsql -as $$ - begin - create temporary table tmp_content ( - sha1 sha1 not null - ) on commit drop; - return; - end -$$; - -create or replace function swh_content_copies_from_temp(archive_names text[]) - returns void - language plpgsql -as $$ - begin - with existing_content_ids as ( - select id - from content - inner join tmp_content on content.sha1 = tmp.sha1 - ), created_content_ids as ( - insert into content (sha1) - select sha1 from tmp_content - on conflict do nothing - returning id - ), content_ids as ( - select * from existing_content_ids - union all - select * from created_content_ids - ), archive_ids as ( - select id from archive - where name = any(archive_names) - ) insert into content_copies (content_id, archive_id, mtime, status) - select content_ids.id, archive_ids.id, now(), 'present' - from content_ids cross join archive_ids - on conflict (content_id, archive_id) do update - set mtime = excluded.mtime, status = excluded.status; - end -$$; diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql deleted file mode 100644 index e5849585f..000000000 --- a/sql/archiver/swh-archiver-schema.sql +++ /dev/null @@ -1,63 +0,0 @@ --- In order to archive the content of the object storage, add --- some tables to keep trace of what have already been archived. - -create table dbversion -( - version int primary key, - release timestamptz, - description text -); - -comment on table dbversion is 'Schema update tracking'; - -INSERT INTO dbversion(version, release, description) -VALUES(10, now(), 'Work In Progress'); - -CREATE TABLE archive ( - id bigserial PRIMARY KEY, - name text not null -); - -create unique index on archive(name); - -comment on table archive is 'The archives in which contents are stored'; -comment on column archive.id is 'Short identifier for archives'; -comment on column archive.name is 'Name of the archive'; - -CREATE TYPE archive_status AS ENUM ( - 'missing', - 'ongoing', - 'present', - 'corrupted' -); - -comment on type archive_status is 'Status of a given copy of a content'; - --- a SHA1 checksum (not necessarily originating from Git) -CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20); - --- a bucket for which we count items -CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2); - -create table content ( - id bigserial primary key, - sha1 sha1 not null -); - -comment on table content is 'All the contents being archived by Software Heritage'; -comment on column content.id is 'Short id for the content being archived'; -comment on column content.sha1 is 'SHA1 hash of the content being archived'; - -create unique index on content(sha1); - -create table content_copies ( - content_id bigint not null, -- references content(id) - archive_id bigint not null, -- references archive(id) - mtime timestamptz, - status archive_status not null, - primary key (content_id, archive_id) -); - -comment on table content_copies is 'Tracking of all content copies in the archives'; -comment on column content_copies.mtime is 'Last update time of the copy'; -comment on column content_copies.status is 'Status of the copy'; diff --git a/sql/archiver/upgrades/002.sql b/sql/archiver/upgrades/002.sql deleted file mode 100644 index d83db0281..000000000 --- a/sql/archiver/upgrades/002.sql +++ /dev/null @@ -1,9 +0,0 @@ --- SWH DB schema upgrade --- from_version: 1 --- to_version: 2 --- description: Add a 'corrupted' status into the archive_content status - -INSERT INTO dbversion(version, release, description) -VALUES(2, now(), 'Work In Progress'); - -ALTER TYPE archive_status ADD VALUE 'corrupted'; diff --git a/sql/archiver/upgrades/003.sql b/sql/archiver/upgrades/003.sql deleted file mode 100644 index ba43f5268..000000000 --- a/sql/archiver/upgrades/003.sql +++ /dev/null @@ -1,25 +0,0 @@ --- SWH DB schema upgrade --- from_version: 2 --- to_version: 3 --- description: Add a 'num_present' cache column into the archive_content status - -INSERT INTO dbversion(version, release, description) -VALUES(3, now(), 'Work In Progress'); - -alter table content_archive add column num_present int default null; -comment on column content_archive.num_present is 'Number of copies marked as present (cache updated via trigger)'; - -create index concurrently on content_archive(num_present); - --- Keep the num_copies cache updated -CREATE FUNCTION update_num_present() RETURNS TRIGGER AS $$ - BEGIN - NEW.num_present := (select count(*) from jsonb_each(NEW.copies) where value->>'status' = 'present'); - RETURN new; - END; -$$ LANGUAGE PLPGSQL; - -CREATE TRIGGER update_num_present - BEFORE INSERT OR UPDATE OF copies ON content_archive - FOR EACH ROW - EXECUTE PROCEDURE update_num_present(); diff --git a/sql/archiver/upgrades/004.sql b/sql/archiver/upgrades/004.sql deleted file mode 100644 index bfb5ad314..000000000 --- a/sql/archiver/upgrades/004.sql +++ /dev/null @@ -1,44 +0,0 @@ --- SWH DB schema upgrade --- from_version: 3 --- to_version: 4 --- description: Add azure instance - -INSERT INTO dbversion(version, release, description) -VALUES(4, now(), 'Work In Progress'); - -ALTER TABLE archive DROP COLUMN url; -ALTER TABLE archive ALTER COLUMN id SET DATA TYPE TEXT; - -INSERT INTO archive(id) VALUES ('azure'); - -create or replace function swh_mktemp_content_archive() - returns void - language sql -as $$ - create temporary table tmp_content_archive ( - like content_archive including defaults - ) on commit drop; - alter table tmp_content_archive drop column copies; - alter table tmp_content_archive drop column num_present; -$$; - -COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive'; - -create or replace function swh_content_archive_missing(backend_name text) - returns setof sha1 - language plpgsql -as $$ -begin - return query - select content_id - from tmp_content_archive tmp where exists ( - select 1 - from content_archive c - where tmp.content_id = c.content_id - and (not c.copies ? backend_name - or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb)) - ); -end -$$; - -COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend'; diff --git a/sql/archiver/upgrades/005.sql b/sql/archiver/upgrades/005.sql deleted file mode 100644 index bc50631c1..000000000 --- a/sql/archiver/upgrades/005.sql +++ /dev/null @@ -1,24 +0,0 @@ --- SWH DB schema upgrade --- from_version: 4 --- to_version: 5 --- description: List unknown sha1s from content_archive - -INSERT INTO dbversion(version, release, description) -VALUES(5, now(), 'Work In Progress'); - -create or replace function swh_content_archive_unknown() - returns setof sha1 - language plpgsql -as $$ -begin - return query - select content_id - from tmp_content_archive tmp where not exists ( - select 1 - from content_archive c - where tmp.content_id = c.content_id - ); -end -$$; - -COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1'; diff --git a/sql/archiver/upgrades/006.sql b/sql/archiver/upgrades/006.sql deleted file mode 100644 index d9d1b24ce..000000000 --- a/sql/archiver/upgrades/006.sql +++ /dev/null @@ -1,100 +0,0 @@ --- SWH DB schema upgrade --- from_version: 5 --- to_version: 6 --- description: Create a bucketed count of contents in the archive. - -INSERT INTO dbversion(version, release, description) -VALUES(6, now(), 'Work In Progress'); - --- a bucket for which we count items -CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2); - -CREATE TABLE content_archive_counts ( - archive text not null references archive(id), - bucket bucket not null, - count bigint, - primary key (archive, bucket) -); - -comment on table content_archive_counts is 'Bucketed count of archive contents'; -comment on column content_archive_counts.archive is 'the archive for which we''re counting'; -comment on column content_archive_counts.bucket is 'the bucket of items we''re counting'; -comment on column content_archive_counts.count is 'the number of items counted in the given bucket'; - - -CREATE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$ - with sample as ( - select content_id, copies from content_archive - where content_id > from_id and content_id <= to_id - ), data as ( - select substring(content_id from 19) as bucket, jbe.key as archive - from sample - join lateral jsonb_each(copies) jbe on true - where jbe.value->>'status' = 'present' - ), bucketed as ( - select bucket, archive, count(*) as count - from data - group by bucket, archive - ) update content_archive_counts cac set - count = cac.count + bucketed.count - from bucketed - where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket; -$$; - -comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts'; - -CREATE FUNCTION init_content_archive_counts() returns void language sql as $$ - insert into content_archive_counts ( - select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count - from archive join lateral generate_series(0, 65535) bucket on true - ) on conflict (archive, bucket) do nothing; -$$; - -comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives'; - --- keep the content_archive_counts updated -CREATE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ - DECLARE - content_id sha1; - content_bucket bucket; - copies record; - old_row content_archive; - new_row content_archive; - BEGIN - -- default values for old or new row depending on trigger type - if tg_op = 'INSERT' then - old_row := (null::sha1, '{}'::jsonb, 0); - else - old_row := old; - end if; - if tg_op = 'DELETE' then - new_row := (null::sha1, '{}'::jsonb, 0); - else - new_row := new; - end if; - - -- get the content bucket - content_id := coalesce(old_row.content_id, new_row.content_id); - content_bucket := substring(content_id from 19)::bucket; - - -- compare copies present in old and new row for each archive type - FOR copies IN - select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status - from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key - LOOP - -- the count didn't change - CONTINUE WHEN copies.old_status is distinct from copies.new_status OR - (copies.old_status != 'present' AND copies.new_status != 'present'); - - update content_archive_counts cac - set count = count + (case when copies.old_status = 'present' then -1 else 1 end) - where archive = copies.archive and bucket = content_bucket; - END LOOP; - return null; - END; -$$; - -create trigger update_content_archive_counts - AFTER INSERT OR UPDATE OR DELETE ON content_archive - FOR EACH ROW - EXECUTE PROCEDURE update_content_archive_counts(); diff --git a/sql/archiver/upgrades/007.sql b/sql/archiver/upgrades/007.sql deleted file mode 100644 index 34049426e..000000000 --- a/sql/archiver/upgrades/007.sql +++ /dev/null @@ -1,21 +0,0 @@ --- SWH DB schema upgrade --- from_version: 6 --- to_version: 7 --- description: Add a function to compute archive counts - -INSERT INTO dbversion(version, release, description) -VALUES(7, now(), 'Work In Progress'); - -create type content_archive_count as ( - archive text, - count bigint -); - -create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$ - select archive, sum(count)::bigint - from content_archive_counts - group by archive - order by archive; -$$; - -comment on function get_content_archive_counts() is 'Get count for each archive'; diff --git a/sql/archiver/upgrades/008.sql b/sql/archiver/upgrades/008.sql deleted file mode 100644 index 6527aca69..000000000 --- a/sql/archiver/upgrades/008.sql +++ /dev/null @@ -1,49 +0,0 @@ --- SWH DB schema upgrade --- from_version: 7 --- to_version: 8 --- description: Fix silly bug in update_content_archive_counts - -INSERT INTO dbversion(version, release, description) -VALUES(8, now(), 'Work In Progress'); - --- keep the content_archive_counts updated -CREATE OR REPLACE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ - DECLARE - content_id sha1; - content_bucket bucket; - copies record; - old_row content_archive; - new_row content_archive; - BEGIN - -- default values for old or new row depending on trigger type - if tg_op = 'INSERT' then - old_row := (null::sha1, '{}'::jsonb, 0); - else - old_row := old; - end if; - if tg_op = 'DELETE' then - new_row := (null::sha1, '{}'::jsonb, 0); - else - new_row := new; - end if; - - -- get the content bucket - content_id := coalesce(old_row.content_id, new_row.content_id); - content_bucket := substring(content_id from 19)::bucket; - - -- compare copies present in old and new row for each archive type - FOR copies IN - select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status - from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key - LOOP - -- the count didn't change - CONTINUE WHEN copies.old_status is not distinct from copies.new_status OR - (copies.old_status != 'present' AND copies.new_status != 'present'); - - update content_archive_counts cac - set count = count + (case when copies.old_status = 'present' then -1 else 1 end) - where archive = copies.archive and bucket = content_bucket; - END LOOP; - return null; - END; -$$; diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql deleted file mode 100644 index 5a3133bab..000000000 --- a/sql/archiver/upgrades/009.sql +++ /dev/null @@ -1,42 +0,0 @@ --- SWH Archiver DB schema upgrade --- from_version: 8 --- to_version: 9 --- description: Add helper functions to create temporary table and insert new entries in content_archive table - -insert into dbversion(version, release, description) -values(9, now(), 'Work In Progress'); - --- create a temporary table called tmp_TBLNAME, mimicking existing --- table TBLNAME -create or replace function swh_mktemp(tblname regclass) - returns void - language plpgsql -as $$ -begin - execute format(' - create temporary table tmp_%1$I - (like %1$I including defaults) - on commit drop; - ', tblname); - return; -end -$$; - -comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; - --- Helper function to insert new entries in content_archive from a --- temporary table skipping duplicates. -create or replace function swh_content_archive_add() - returns void - language plpgsql -as $$ -begin - insert into content_archive (content_id, copies, num_present) - select distinct content_id, copies, num_present - from tmp_content_archive - on conflict(content_id) do nothing; - return; -end -$$; - -comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py deleted file mode 100644 index 2ff1cce13..000000000 --- a/swh/storage/archiver/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .director import ArchiverWithRetentionPolicyDirector # NOQA -from .director import ArchiverStdinToBackendDirector # NOQA -from .worker import ArchiverWithRetentionPolicyWorker # NOQA -from .worker import ArchiverToBackendWorker # NOQA -from .copier import ArchiverCopier # NOQA diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py deleted file mode 100644 index 1832e2bd2..000000000 --- a/swh/storage/archiver/copier.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -import logging - -from swh.objstorage.exc import ObjNotFoundError -from swh.model import hashutil - -logger = logging.getLogger('archiver.worker.copier') - - -class ArchiverCopier(): - """ This archiver copy some files into a remote objstorage - in order to get a backup. - """ - def __init__(self, source, destination, content_ids): - """ Create a Copier for the archiver - - Args: - source (ObjStorage): source storage to get the contents. - destination (ObjStorage): Storage where the contents will - be copied. - content_ids: list of content's id to archive. - """ - self.source = source - self.destination = destination - self.content_ids = content_ids - - def run(self): - """ Do the copy on the backup storage. - - Run the archiver copier in order to copy the required content - into the current destination. - The content which corresponds to the sha1 in self.content_ids - will be fetched from the master_storage and then copied into - the backup object storage. - - Returns: - A boolean that indicates if the whole content have been copied. - """ - try: - for content_id in self.content_ids: - try: - content = self.source.get(content_id) - except ObjNotFoundError: - logging.error('content %s not found' % - hashutil.hash_to_hex(content_id)) - continue - self.destination.add(content, content_id) - except Exception as e: - logger.exception('Problem during copy: %s' % e) - return False - return True diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py deleted file mode 100644 index d9e5e4399..000000000 --- a/swh/storage/archiver/db.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime - -from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure - - -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - -class ArchiverDb(BaseDb): - """Proxy to the SWH's archiver DB - - """ - - def archive_ls(self, cur=None): - """ Get all the archives registered on the server. - - Yields: - a tuple (server_id, server_url) for each archive server. - """ - cur = self._cursor(cur) - cur.execute("SELECT * FROM archive") - yield from cursor_to_bytes(cur) - - def content_archive_get(self, content_id, cur=None): - """ Get the archival status of a content in a specific server. - - Retrieve from the database the archival status of the given content - in the given archive server. - - Args: - content_id: the sha1 of the content. - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - """ - query = """select archive.name, status, mtime - from content_copies - left join archive on content_copies.archive_id = archive.id - where content_copies.content_id = ( - select id from content where sha1 = %s) - """ - cur = self._cursor(cur) - cur.execute(query, (content_id,)) - rows = cur.fetchall() - if not rows: - return None - present = [] - ongoing = {} - for archive, status, mtime in rows: - if status == 'present': - present.append(archive) - elif status == 'ongoing': - ongoing[archive] = mtime - return (content_id, present, ongoing) - - def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): - """Get the list of copies for `limit` contents starting after - `last_content`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - - vars = { - 'limit': limit, - } - - if last_content is None: - last_content_clause = 'true' - else: - last_content_clause = """content_id > ( - select id from content - where sha1 = %(last_content)s)""" - vars['last_content'] = last_content - - query = """select - (select sha1 from content where id = content_id), - array_agg((select name from archive - where id = archive_id)) - from content_copies - where status = 'present' and %s - group by content_id - order by content_id - limit %%(limit)s""" % last_content_clause - - cur = self._cursor(cur) - cur.execute(query, vars) - for content_id, present in cursor_to_bytes(cur): - yield (content_id, present, {}) - - def content_archive_get_unarchived_copies( - self, retention_policy, last_content=None, - limit=1000, cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. Yields only copies with number of present - smaller than `retention policy`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - retention_policy: number of required present copies - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - - vars = { - 'limit': limit, - 'retention_policy': retention_policy, - } - - if last_content is None: - last_content_clause = 'true' - else: - last_content_clause = """content_id > ( - select id from content - where sha1 = %(last_content)s)""" - vars['last_content'] = last_content - - query = """select - (select sha1 from content where id = content_id), - array_agg((select name from archive - where id = archive_id)) - from content_copies - where status = 'present' and %s - group by content_id - having count(archive_id) < %%(retention_policy)s - order by content_id - limit %%(limit)s""" % last_content_clause - - cur = self._cursor(cur) - cur.execute(query, vars) - for content_id, present in cursor_to_bytes(cur): - yield (content_id, present, {}) - - @stored_procedure('swh_mktemp_content_archive') - def mktemp_content_archive(self, cur=None): - """Trigger the creation of the temporary table tmp_content_archive - during the lifetime of the transaction. - - """ - pass - - @stored_procedure('swh_content_archive_add') - def content_archive_add_from_temp(self, cur=None): - """Add new content archive entries from temporary table. - - Use from archiver.storage module: - self.db.mktemp_content_archive() - # copy data over to the temp table - self.db.copy_to([{'colname': id0}, {'colname': id1}], - 'tmp_cache_content', - ['colname'], cur) - # insert into the main table - self.db.add_content_archive_from_temp(cur) - - """ - pass - - def content_archive_get_missing(self, backend_name, cur=None): - """Retrieve the content missing from backend_name. - - """ - cur = self._cursor(cur) - cur.execute("select * from swh_content_archive_missing(%s)", - (backend_name,)) - yield from cursor_to_bytes(cur) - - def content_archive_get_unknown(self, cur=None): - """Retrieve unknown sha1 from archiver db. - - """ - cur = self._cursor(cur) - cur.execute('select * from swh_content_archive_unknown()') - yield from cursor_to_bytes(cur) - - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of an archive content and set its mtime to - - Change the mtime of an archived content for the given archive and set - it's mtime to the current time. - - Args: - content_id (str): content sha1 - archive_id (str): name of the archive - new_status (str): one of 'missing', 'present' or 'ongoing'. - this status will replace the previous one. If not given, - the function only change the mtime of the content for the - given archive. - """ - assert isinstance(content_id, bytes) - assert new_status is not None - - query = """insert into content_copies (archive_id, content_id, status, mtime) - values ((select id from archive where name=%s), - (select id from content where sha1=%s), - %s, %s) - on conflict (archive_id, content_id) do - update set status = excluded.status, mtime = excluded.mtime - """ - - cur = self._cursor(cur) - cur.execute(query, (archive_id, content_id, new_status, utcnow())) diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py deleted file mode 100644 index 20b36a366..000000000 --- a/swh/storage/archiver/director.py +++ /dev/null @@ -1,339 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import logging -import sys -import time - -import click - -from swh.core import config, utils -from swh.model import hashutil -from swh.objstorage import get_objstorage -from swh.scheduler.utils import get_task - -from . import tasks # noqa -from .storage import get_archiver_storage - - -class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta): - """Abstract Director class - - An archiver director is in charge of dispatching batch of - contents to archiver workers (for them to archive). - - Inherit from this class and provide: - - ADDITIONAL_CONFIG: Some added configuration needed for the - director to work - - CONFIG_BASE_FILENAME: relative path to lookup for the - configuration file - - def get_contents_to_archive(self): Implementation method to read - contents to archive - - """ - DEFAULT_CONFIG = { - 'batch_max_size': ('int', 1500), - 'asynchronous': ('bool', True), - 'max_queue_length': ('int', 100000), - 'queue_throttling_delay': ('int', 120), - - 'archiver_storage': ('dict', { - 'cls': 'db', - 'args': { - 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest', - }, - }), - } - - # Destined to be overridden by subclass - ADDITIONAL_CONFIG = {} - - # We use the same configuration file as the worker - CONFIG_BASE_FILENAME = 'archiver/worker' - - # The worker's task queue name to use - TASK_NAME = None - - def __init__(self): - """ Constructor of the archiver director. - - Args: - db_conn_archiver: Either a libpq connection string, - or a psycopg2 connection for the archiver db. - config: optionnal additional configuration. Keys in the dict will - override the one parsed from the configuration file. - """ - super().__init__() - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - self.archiver_storage = get_archiver_storage( - **self.config['archiver_storage']) - self.task = get_task(self.TASK_NAME) - self.max_queue_length = self.config['max_queue_length'] - self.throttling_delay = self.config['queue_throttling_delay'] - - def run(self): - """ Run the archiver director. - - The archiver director will check all the contents of the archiver - database and do the required backup jobs. - """ - if self.config['asynchronous']: - run_fn = self.run_async_worker - else: - run_fn = self.run_sync_worker - - for batch in self.read_batch_contents(): - run_fn(batch) - - def run_async_worker(self, batch): - """Produce a worker that will be added to the task queue. - - """ - max_length = self.max_queue_length - throttling_delay = self.throttling_delay - - while True: - length = self.task.app.get_queue_length(self.task.task_queue) - if length >= max_length: - logging.info( - 'queue length %s >= %s, throttling for %s seconds' % ( - length, - max_length, - throttling_delay, - ) - ) - time.sleep(throttling_delay) - else: - break - - self.task.delay(batch=batch) - - def run_sync_worker(self, batch): - """Run synchronously a worker on the given batch. - - """ - self.task(batch=batch) - - def read_batch_contents(self): - """ Create batch of contents that needs to be archived - - Yields: - batch of sha1 that corresponds to contents that needs more archive - copies. - """ - contents = [] - for content in self.get_contents_to_archive(): - contents.append(content) - if len(contents) > self.config['batch_max_size']: - yield contents - contents = [] - if len(contents) > 0: - yield contents - - @abc.abstractmethod - def get_contents_to_archive(self): - """Retrieve generator of sha1 to archive - - Yields: - sha1 to archive - - """ - pass - - -class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): - """Process the files in order to know which one is needed as backup. - - The archiver director processes the files in the local storage in order - to know which one needs archival and it delegates this task to - archiver workers. - """ - - ADDITIONAL_CONFIG = { - 'retention_policy': ('int', 2), - } - - TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask' - - def __init__(self, start_id): - super().__init__() - if start_id is not None: - self.start_id = hashutil.hash_to_bytes(start_id) - else: - self.start_id = None - - def get_contents_to_archive(self): - """Create batch of contents that needs to be archived - - Yields: - Datas about a content as a tuple - (content_id, present_copies, ongoing_copies) where ongoing_copies - is a dict mapping copy to mtime. - - """ - last_content = self.start_id - while True: - archiver_contents = list( - self.archiver_storage.content_archive_get_unarchived_copies( - last_content=last_content, - retention_policy=self.config['retention_policy'], - limit=self.config['batch_max_size'])) - if not archiver_contents: - return - for content_id, _, _ in archiver_contents: - last_content = content_id - yield content_id - - -def read_sha1_from_stdin(): - """Read sha1 from stdin. - - """ - for line in sys.stdin: - sha1 = line.strip() - try: - yield hashutil.hash_to_bytes(sha1) - except Exception: - print("%s is not a valid sha1 hash, continuing" % repr(sha1), - file=sys.stderr) - continue - - -class ArchiverStdinToBackendDirector(ArchiverDirectorBase): - """A cloud archiver director in charge of reading contents and send - them in batch in the cloud. - - The archiver director, in order: - - Reads sha1 to send to a specific backend. - - Checks if those sha1 are known in the archiver. If they are not, - add them - - if the sha1 are missing, they are sent for the worker to archive - - If the flag force_copy is set, this will force the copy to be sent - for archive even though it has already been done. - - """ - ADDITIONAL_CONFIG = { - 'destination': ('str', 'azure'), - 'force_copy': ('bool', False), - 'source': ('str', 'uffizi'), - 'storages': ('list[dict]', - [ - {'host': 'uffizi', - 'cls': 'pathslicing', - 'args': {'root': '/tmp/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}, - {'host': 'banco', - 'cls': 'remote', - 'args': {'base_url': 'http://banco:5003/'}} - ]) - } - - CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' - - TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask' - - def __init__(self): - super().__init__() - self.destination = self.config['destination'] - self.force_copy = self.config['force_copy'] - self.objstorages = { - storage['host']: get_objstorage(storage['cls'], storage['args']) - for storage in self.config.get('storages', []) - } - # Fallback objstorage - self.source = self.config['source'] - - def _add_unknown_content_ids(self, content_ids): - """Check whether some content_id are unknown. - If they are, add them to the archiver db. - - Args: - content_ids: List of dict with one key content_id - - """ - source_objstorage = self.objstorages[self.source] - - self.archiver_storage.content_archive_add( - (h - for h in content_ids - if h in source_objstorage), - sources_present=[self.source]) - - def get_contents_to_archive(self): - gen_content_ids = ( - ids for ids in utils.grouper(read_sha1_from_stdin(), - self.config['batch_max_size'])) - - if self.force_copy: - for content_ids in gen_content_ids: - content_ids = list(content_ids) - - if not content_ids: - continue - - # Add missing entries in archiver table - self._add_unknown_content_ids(content_ids) - - print('Send %s contents to archive' % len(content_ids)) - - for content_id in content_ids: - # force its status to missing - self.archiver_storage.content_archive_update( - content_id, self.destination, 'missing') - yield content_id - - else: - for content_ids in gen_content_ids: - content_ids = list(content_ids) - - # Add missing entries in archiver table - self._add_unknown_content_ids(content_ids) - - # Filter already copied data - content_ids = list( - self.archiver_storage.content_archive_get_missing( - content_ids=content_ids, - backend_name=self.destination)) - - if not content_ids: - continue - - print('Send %s contents to archive' % len(content_ids)) - - for content in content_ids: - yield content - - def run_async_worker(self, batch): - """Produce a worker that will be added to the task queue. - - """ - self.task.delay(destination=self.destination, batch=batch) - - def run_sync_worker(self, batch): - """Run synchronously a worker on the given batch. - - """ - self.task(destination=self.destination, batch=batch) - - -@click.command() -@click.option('--direct', is_flag=True, - help="""The archiver sends content for backup to -one storage.""") -@click.option('--start-id', default=None, help="The first id to process") -def launch(direct, start_id): - if direct: - archiver = ArchiverStdinToBackendDirector() - else: - archiver = ArchiverWithRetentionPolicyDirector(start_id) - - archiver.run() - - -if __name__ == '__main__': - launch() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py deleted file mode 100644 index 977105825..000000000 --- a/swh/storage/archiver/storage.py +++ /dev/null @@ -1,361 +0,0 @@ -# Copyright (C) 2016-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import json -import os -import psycopg2 -import time - -from .db import ArchiverDb - -from swh.model import hashutil -from swh.storage.common import db_transaction_generator, db_transaction -from swh.storage.exc import StorageDBError - - -class ArchiverStorage(): - """SWH Archiver storage proxy, encompassing DB - - """ - def __init__(self, dbconn): - """ - Args: - db_conn: either a libpq connection string, or a psycopg2 connection - - """ - try: - if isinstance(dbconn, psycopg2.extensions.connection): - self.db = ArchiverDb(dbconn) - else: - self.db = ArchiverDb.connect(dbconn) - except psycopg2.OperationalError as e: - raise StorageDBError(e) - - @db_transaction_generator - def archive_ls(self, cur=None): - """ Get all the archives registered on the server. - - Yields: - a tuple (server_id, server_url) for each archive server. - """ - yield from self.db.archive_ls(cur) - - @db_transaction - def content_archive_get(self, content_id, cur=None): - """ Get the archival status of a content. - - Retrieve from the database the archival status of the given content - - Args: - content_id: the sha1 of the content - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - """ - return self.db.content_archive_get(content_id, cur) - - @db_transaction_generator - def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from self.db.content_archive_get_copies(last_content, limit, - cur) - - @db_transaction_generator - def content_archive_get_unarchived_copies( - self, retention_policy, last_content=None, - limit=1000, cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. Yields only copies with number of present - smaller than `retention policy`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - retention_policy: number of required present copies - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from self.db.content_archive_get_unarchived_copies( - retention_policy, last_content, limit, cur) - - @db_transaction_generator - def content_archive_get_missing(self, content_ids, backend_name, cur=None): - """Retrieve missing sha1s from source_name. - - Args: - content_ids ([sha1s]): list of sha1s to test - source_name (str): Name of the backend to check for content - - Yields: - missing sha1s from backend_name - - """ - db = self.db - - db.mktemp_content_archive() - - db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) - - for content_id in db.content_archive_get_missing(backend_name, cur): - yield content_id[0] - - @db_transaction_generator - def content_archive_get_unknown(self, content_ids, cur=None): - """Retrieve unknown sha1s from content_archive. - - Args: - content_ids ([sha1s]): list of sha1s to test - - Yields: - Unknown sha1s from content_archive - - """ - db = self.db - - db.mktemp_content_archive() - - db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) - - for content_id in db.content_archive_get_unknown(cur): - yield content_id[0] - - @db_transaction - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of an archive content and set its mtime to now - - Change the mtime of an archived content for the given archive and set - it's mtime to the current time. - - Args: - content_id (str): content sha1 - archive_id (str): name of the archive - new_status (str): one of 'missing', 'present' or 'ongoing'. - this status will replace the previous one. If not given, - the function only change the mtime of the content for the - given archive. - """ - self.db.content_archive_update(content_id, archive_id, new_status, cur) - - @db_transaction - def content_archive_add( - self, content_ids, sources_present, cur=None): - """Insert a new entry in db about content_id. - - Args: - content_ids ([bytes|str]): content identifiers - sources_present ([str]): List of source names where - contents are present - """ - db = self.db - - # Prepare copies dictionary - copies = {} - for source in sources_present: - copies[source] = { - "status": "present", - "mtime": int(time.time()), - } - - copies = json.dumps(copies) - num_present = len(sources_present) - - db.mktemp('content_archive') - db.copy_to( - ({'content_id': id, - 'copies': copies, - 'num_present': num_present} - for id in content_ids), - 'tmp_content_archive', - ['content_id', 'copies', 'num_present'], - cur) - db.content_archive_add_from_temp(cur) - - -class StubArchiverStorage(): - def __init__(self, archives, present, missing, logfile_base): - """ - A stub storage for the archiver that doesn't write to disk - - Args: - - archives: a dictionary mapping archive names to archive URLs - - present: archives where the objects are all considered present - - missing: archives where the objects are all considered missing - - logfile_base: basename for the logfile - """ - self.archives = archives - self.present = set(present) - self.missing = set(missing) - if set(archives) != self.present | self.missing: - raise ValueError("Present and missing archives don't match") - self.logfile_base = logfile_base - self.__logfile = None - - def open_logfile(self): - if self.__logfile: - return - - logfile_name = "%s.%d" % (self.logfile_base, os.getpid()) - self.__logfile = open(logfile_name, 'a') - - def close_logfile(self): - if not self.__logfile: - return - - self.__logfile.close() - self.__logfile = None - - def archive_ls(self, cur=None): - """ Get all the archives registered on the server. - - Yields: - a tuple (server_id, server_url) for each archive server. - """ - yield from self.archives.items() - - def content_archive_get(self, content_id, cur=None): - """ Get the archival status of a content. - - Retrieve from the database the archival status of the given content - - Args: - content_id: the sha1 of the content - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - """ - return (content_id, self.present, {}) - - def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from [] - - def content_archive_get_unarchived_copies(self, retention_policy, - last_content=None, limit=1000, - cur=None): - """ Get the list of copies for `limit` contents starting after - `last_content`. Yields only copies with number of present - smaller than `retention policy`. - - Args: - last_content: sha1 of the last content retrieved. May be None - to start at the beginning. - retention_policy: number of required present copies - limit: number of contents to retrieve. Can be None to retrieve all - objects (will be slow). - - Yields: - A tuple (content_id, present_copies, ongoing_copies), where - ongoing_copies is a dict mapping copy to mtime. - - """ - yield from [] - - def content_archive_get_missing(self, content_ids, backend_name, cur=None): - """Retrieve missing sha1s from source_name. - - Args: - content_ids ([sha1s]): list of sha1s to test - source_name (str): Name of the backend to check for content - - Yields: - missing sha1s from backend_name - - """ - if backend_name in self.missing: - yield from content_ids - elif backend_name in self.present: - yield from [] - else: - raise ValueError('Unknown backend `%s`' % backend_name) - - def content_archive_get_unknown(self, content_ids, cur=None): - """Retrieve unknown sha1s from content_archive. - - Args: - content_ids ([sha1s]): list of sha1s to test - - Yields: - Unknown sha1s from content_archive - - """ - yield from [] - - def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): - """ Update the status of an archive content and set its mtime to now - - Change the mtime of an archived content for the given archive and set - it's mtime to the current time. - - Args: - content_id (str): content sha1 - archive_id (str): name of the archive - new_status (str): one of 'missing', 'present' or 'ongoing'. - this status will replace the previous one. If not given, - the function only change the mtime of the content for the - given archive. - """ - if not self.__logfile: - self.open_logfile() - - print(time.time(), archive_id, new_status, - hashutil.hash_to_hex(content_id), file=self.__logfile) - - def content_archive_add( - self, content_ids, sources_present, cur=None): - """Insert a new entry in db about content_id. - - Args: - content_ids ([bytes|str]): content identifiers - sources_present ([str]): List of source names where - contents are present - """ - pass - - -def get_archiver_storage(cls, args): - """Instantiate an archiver database with the proper class and arguments""" - if cls == 'db': - return ArchiverStorage(**args) - elif cls == 'stub': - return StubArchiverStorage(**args) - else: - raise ValueError('Unknown Archiver Storage class `%s`' % cls) diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py deleted file mode 100644 index ccb0a2f63..000000000 --- a/swh/storage/archiver/tasks.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (C) 2015-2016 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.scheduler.task import Task -from .worker import ArchiverWithRetentionPolicyWorker -from .worker import ArchiverToBackendWorker - - -class SWHArchiverWithRetentionPolicyTask(Task): - """Main task that archive a batch of content. - - """ - task_queue = 'swh_storage_archive_worker' - - def run_task(self, *args, **kwargs): - ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() - - -class SWHArchiverToBackendTask(Task): - """Main task that archive a batch of content in the cloud. - - """ - task_queue = 'swh_storage_archive_worker_to_backend' - - def run_task(self, *args, **kwargs): - ArchiverToBackendWorker(*args, **kwargs).run() diff --git a/swh/storage/archiver/updater.py b/swh/storage/archiver/updater.py deleted file mode 100644 index 64cf1cea4..000000000 --- a/swh/storage/archiver/updater.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging - -from swh.journal.client import SWHJournalClient - -from .storage import get_archiver_storage - - -class SWHArchiverContentUpdater(SWHJournalClient): - """Client in charge of updating new contents in the content_archiver - db. - - This is a swh.journal client only dealing with contents. - - """ - CONFIG_BASE_FILENAME = 'archiver/content_updater' - - ADDITIONAL_CONFIG = { - 'archiver_storage': ( - 'dict', { - 'cls': 'db', - 'args': { - 'dbconn': 'dbname=softwareheritage-archiver-dev ' - 'user=guest', - } - }), - 'sources_present': ('list[str]', ['uffizi']) - } - - def __init__(self): - # Only interested in content here so override the configuration - super().__init__(extra_configuration={'object_types': ['content']}) - - self.sources_present = self.config['sources_present'] - - self.archiver_storage = get_archiver_storage( - **self.config['archiver_storage']) - - def process_objects(self, messages): - self.archiver_storage.content_archive_add( - (c[b'sha1'] for c in messages['content']), - self.sources_present) - - -if __name__ == '__main__': - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - - content_updater = SWHArchiverContentUpdater() - content_updater.process() diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py deleted file mode 100644 index c94d6f152..000000000 --- a/swh/storage/archiver/worker.py +++ /dev/null @@ -1,429 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import datetime -import logging -import random - -from collections import defaultdict -from celery import group - -from swh.core import config, utils -from swh.objstorage import get_objstorage -from swh.objstorage.exc import Error, ObjNotFoundError -from swh.model import hashutil -from swh.scheduler.utils import get_task - -from .storage import get_archiver_storage -from .copier import ArchiverCopier - - -logger = logging.getLogger('archiver.worker') - - -class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta): - """Base archive worker. - - Inherit from this class and override: - - ADDITIONAL_CONFIG: Some added configuration needed for the - director to work - - CONFIG_BASE_FILENAME: relative path to lookup for the - configuration file - - def need_archival(self, content_data): Determine if a content - needs archival or not - - def choose_backup_servers(self, present, missing): Choose - which backup server to send copies to - - """ - DEFAULT_CONFIG = { - 'archiver_storage': ('dict', { - 'cls': 'db', - 'args': { - 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest', - }, - }), - 'storages': ('list[dict]', - [ - {'host': 'uffizi', - 'cls': 'pathslicing', - 'args': {'root': '/tmp/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6'}}, - {'host': 'banco', - 'cls': 'remote', - 'args': {'base_url': 'http://banco:5003/'}} - ]) - } - - ADDITIONAL_CONFIG = {} - - CONFIG_BASE_FILENAME = 'archiver/worker' - - objstorages = {} - - def __init__(self, batch): - super().__init__() - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - self.batch = batch - self.archiver_db = get_archiver_storage( - **self.config['archiver_storage']) - self.objstorages = { - storage['host']: get_objstorage(storage['cls'], storage['args']) - for storage in self.config.get('storages', []) - } - self.set_objstorages = set(self.objstorages) - - def run(self): - """Do the task expected from the archiver worker. - - Process the contents in self.batch, ensure that the elements - still need an archival (using archiver db), and spawn copiers - to copy files in each destination according to the - archiver-worker's policy. - - """ - transfers = defaultdict(list) - for obj_id in self.batch: - # Get dict {'missing': [servers], 'present': [servers]} - # for contents ignoring those who don't need archival. - copies = self.compute_copies(self.set_objstorages, obj_id) - if not copies: # could not happen if using .director module - msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id) - logger.warning(msg) - continue - - if not self.need_archival(copies): - continue - - present = copies.get('present', set()) - missing = copies.get('missing', set()) - if len(present) == 0: - msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id) - logger.critical(msg) - continue - - # Choose servers to be used as srcs and dests. - for src_dest in self.choose_backup_servers(present, missing): - transfers[src_dest].append(obj_id) - - # Then run copiers for each of the required transfers. - contents_copied = [] - for (src, dest), content_ids in transfers.items(): - contents_copied.extend(self.run_copier(src, dest, content_ids)) - - # copy is done, eventually do something else with them - self.copy_finished(contents_copied) - - def compute_copies(self, set_objstorages, content_id): - """From a content_id, return present and missing copies. - - Args: - objstorages (set): objstorage's id name - content_id: the content concerned - - Returns: - A dictionary with the following keys: - - 'present': set of archives where the content is present - - 'missing': set of archives where the content is missing - - 'ongoing': ongoing copies: dict mapping the archive id - with the time the copy supposedly started. - """ - result = self.archiver_db.content_archive_get(content_id) - if not result: - return None - _, present, ongoing = result - set_present = set_objstorages & set(present) - set_ongoing = set_objstorages & set(ongoing) - set_missing = set_objstorages - set_present - set_ongoing - return { - 'present': set_present, - 'missing': set_missing, - 'ongoing': {archive: value - for archive, value in ongoing.items() - if archive in set_ongoing}, - } - - def run_copier(self, source, destination, content_ids): - """Run a copier in order to archive the given contents. - - Upload the given contents from the source to the destination. - If the process fails, the whole content is considered uncopied - and remains 'ongoing', waiting to be rescheduled as there is a - delay. - - Args: - source (str): source storage's identifier - destination (str): destination storage's identifier - content_ids ([sha1]): list of content ids to archive. - - """ - # Check if there are any errors among the contents. - content_status = self.get_contents_error(content_ids, source) - - # Iterates over the error detected. - for content_id, real_status in content_status.items(): - # Remove them from the to-archive list, - # as they cannot be retrieved correctly. - content_ids.remove(content_id) - # Update their status to reflect their real state. - self.archiver_db.content_archive_update( - content_id, archive_id=source, new_status=real_status) - - # Now perform the copy on the remaining contents - ac = ArchiverCopier( - source=self.objstorages[source], - destination=self.objstorages[destination], - content_ids=content_ids) - - if ac.run(): - # Once the archival complete, update the database. - for content_id in content_ids: - self.archiver_db.content_archive_update( - content_id, archive_id=destination, new_status='present') - - return content_ids - return [] - - def copy_finished(self, content_ids): - """Hook to notify the content_ids archive copy is finished. - (This is not an abstract method as this is optional - """ - pass - - def get_contents_error(self, content_ids, source_storage): - """Indicates what is the error associated to a content when needed - - Check the given content on the given storage. If an error is detected, - it will be reported through the returned dict. - - Args: - content_ids ([sha1]): list of content ids to check - source_storage (str): the source storage holding the - contents to check. - - Returns: - a dict that map {content_id -> error_status} for each content_id - with an error. The `error_status` result may be 'missing' or - 'corrupted'. - - """ - content_status = {} - storage = self.objstorages[source_storage] - for content_id in content_ids: - try: - storage.check(content_id) - except Error: - content_status[content_id] = 'corrupted' - logger.error('%s corrupted!' % hashutil.hash_to_hex( - content_id)) - except ObjNotFoundError: - content_status[content_id] = 'missing' - logger.error('%s missing!' % hashutil.hash_to_hex(content_id)) - - return content_status - - @abc.abstractmethod - def need_archival(self, content_data): - """Indicate if the content needs to be archived. - - Args: - content_data (dict): dict that contains two lists 'present' and - 'missing' with copies id corresponding to this status. - - Returns: - True if there is not enough copies, False otherwise. - - """ - pass - - @abc.abstractmethod - def choose_backup_servers(self, present, missing): - """Choose and yield the required amount of couple source/destination - - For each required copy, choose a unique destination server - among the missing copies and a source server among the - presents. - - Args: - present: set of objstorage source name where the content - is present - missing: set of objstorage destination name where the - content is missing - - Yields: - tuple (source (str), destination (src)) for each required copy. - - """ - pass - - -class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker): - """ Do the required backups on a given batch of contents. - - Process the content of a content batch in order to do the needed backups on - the slaves servers. - """ - - ADDITIONAL_CONFIG = { - 'retention_policy': ('int', 2), - 'archival_max_age': ('int', 3600), - 'sources': ('list[str]', ['uffizi', 'banco']), - } - - def __init__(self, batch): - """ Constructor of the ArchiverWorker class. - - Args: - batch: list of object's sha1 that potentially need archival. - """ - super().__init__(batch) - config = self.config - self.retention_policy = config['retention_policy'] - self.archival_max_age = config['archival_max_age'] - self.sources = config['sources'] - - if len(self.objstorages) < self.retention_policy: - raise ValueError('Retention policy is too high for the number of ' - 'provided servers') - - def need_archival(self, content_data): - """ Indicate if the content need to be archived. - - Args: - content_data (dict): dict that contains two lists 'present' and - 'missing' with copies id corresponding to this status. - Returns: True if there is not enough copies, False otherwise. - """ - nb_presents = len(content_data.get('present', [])) - for copy, mtime in content_data.get('ongoing', {}).items(): - if not self._is_archival_delay_elapsed(mtime): - nb_presents += 1 - return nb_presents < self.retention_policy - - def _is_archival_delay_elapsed(self, start_time): - """ Indicates if the archival delay is elapsed given the start_time - - Args: - start_time (float): time at which the archival started. - - Returns: - True if the archival delay is elasped, False otherwise - """ - elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - start_time - return elapsed > datetime.timedelta(seconds=self.archival_max_age) - - def choose_backup_servers(self, present, missing): - """Choose and yield the required amount of couple source/destination - - For each required copy, choose a unique destination server - among the missing copies and a source server among the - presents. - - Each destination server is unique so after archival, the - retention policy requirement will be fulfilled. However, the - source server may be used multiple times. - - Args: - present: set of objstorage source name where the content - is present - missing: set of objstorage destination name where the - content is missing - - Yields: - tuple (source, destination) for each required copy. - - """ - # Transform from set to list to allow random selections - missing = list(missing) - present = list(present) - all_sources = [source for source in present if source in self.sources] - nb_required = self.retention_policy - len(present) - destinations = random.sample(missing, nb_required) - sources = [random.choice(all_sources) for dest in destinations] - yield from zip(sources, destinations) - - -class ArchiverToBackendWorker(BaseArchiveWorker): - """Worker that sends copies over from a source to another backend. - - Process the content of a content batch from source objstorage to - destination objstorage. - - """ - - CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' - - ADDITIONAL_CONFIG = { - 'next_task': ( - 'dict', { - 'queue': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', - 'batch_size': 10, - } - ) - } - - def __init__(self, destination, batch): - """Constructor of the ArchiverWorkerToBackend class. - - Args: - destination: where to copy the objects from - batch: sha1s to send to destination - - """ - super().__init__(batch) - self.destination = destination - next_task = self.config['next_task'] - if next_task: - destination_queue = next_task['queue'] - self.task_destination = get_task(destination_queue) - self.batch_size = int(next_task['batch_size']) - else: - self.task_destination = self.batch_size = None - - def need_archival(self, content_data): - """Indicate if the content needs to be archived. - - Args: - content_data (dict): dict that contains 3 lists 'present', - 'ongoing' and 'missing' with copies id corresponding to - this status. - - Returns: - True if we need to archive, False otherwise - - """ - return self.destination in content_data.get('missing', {}) - - def choose_backup_servers(self, present, missing): - """The destination is fixed to the destination mentioned. - - The only variable here is the source of information that we - choose randomly in 'present'. - - Args: - present: set of objstorage source name where the content - is present - missing: set of objstorage destination name where the - content is missing - - Yields: - tuple (source, destination) for each required copy. - - """ - yield (random.choice(list(present)), self.destination) - - def copy_finished(self, content_ids): - """Once the copy is finished, we'll send those batch of contents as - done in the destination queue. - - """ - if self.task_destination: - groups = [] - for ids in utils.grouper(content_ids, self.batch_size): - sig_ids = self.task_destination.s(list(ids)) - groups.append(sig_ids) - - group(groups).delay() diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py deleted file mode 100644 index 036d2121a..000000000 --- a/swh/storage/tests/test_archiver.py +++ /dev/null @@ -1,486 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import glob -import tempfile -import shutil -import unittest -import os - -from nose.tools import istest -from nose.plugins.attrib import attr - -from swh.core.tests.db_testing import DbsTestFixture - -from swh.storage.archiver.storage import get_archiver_storage - -from swh.storage.archiver import ArchiverWithRetentionPolicyDirector -from swh.storage.archiver import ArchiverWithRetentionPolicyWorker -from swh.storage.archiver.db import utcnow - -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError - -try: - # objstorage > 0.17 - from swh.objstorage.api.server import make_app as app - from server_testing import ServerTestFixtureAsync as ServerTestFixture - MIGRATED = True -except ImportError: - # objstorage <= 0.17 - from swh.objstorage.api.server import app - from server_testing import ServerTestFixture - MIGRATED = False - -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') - - -@attr('db') -class TestArchiver(DbsTestFixture, ServerTestFixture, - unittest.TestCase): - """ Test the objstorage archiver. - """ - - TEST_DB_NAMES = [ - 'softwareheritage-archiver-test', - ] - TEST_DB_DUMPS = [ - os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), - ] - TEST_DB_DUMP_TYPES = [ - 'pg_dump', - ] - - def setUp(self): - # Launch the backup server - self.dest_root = tempfile.mkdtemp(prefix='remote') - self.config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6', - } - } - if MIGRATED: - self.app = app(self.config) - else: - self.app = app - super().setUp() - - # Retrieve connection (depends on the order in TEST_DB_NAMES) - self.conn = self.conns[0] # archiver db's connection - self.cursor = self.cursors[0] - - # Create source storage - self.src_root = tempfile.mkdtemp() - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - # Create destination storage - dest_config = { - 'cls': 'remote', - 'args': { - 'url': self.url() - } - } - self.dest_storage = get_objstorage(**dest_config) - - # Keep mapped the id to the storages - self.storages = { - 'uffizi': self.src_storage, - 'banco': self.dest_storage - } - - # Override configurations - src_archiver_conf = {'host': 'uffizi'} - dest_archiver_conf = {'host': 'banco'} - src_archiver_conf.update(src_config) - dest_archiver_conf.update(dest_config) - self.archiver_storages = [src_archiver_conf, dest_archiver_conf] - self._override_director_config() - self._override_worker_config() - # Create the base archiver - self.archiver = self._create_director() - - def tearDown(self): - self.empty_tables() - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - super().tearDown() - - def empty_tables(self): - # Remove all content - self.cursor.execute('DELETE FROM content') - self.cursor.execute('DELETE FROM content_copies') - self.conn.commit() - - def _override_director_config(self, retention_policy=2): - """ Override the default config of the Archiver director - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'batch_max_size': 5000, - 'archival_max_age': 3600, - 'retention_policy': retention_policy, - 'asynchronous': False, - 'max_queue_length': 100000, - 'queue_throttling_delay': 120, - } - - def _override_worker_config(self): - """ Override the default config of the Archiver worker - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa - 'retention_policy': 2, - 'archival_max_age': 3600, - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'storages': self.archiver_storages, - 'source': 'uffizi', - 'sources': ['uffizi'], - } - - def _create_director(self): - return ArchiverWithRetentionPolicyDirector(start_id=None) - - def _create_worker(self, batch={}): - return ArchiverWithRetentionPolicyWorker(batch) - - def _add_content(self, storage_name, content_data): - """ Add really a content to the given objstorage - - This put an empty status for the added content. - - Args: - storage_name: the concerned storage - content_data: the data to insert - with_row_insert: to insert a row entry in the db or not - - """ - # Add the content to the storage - obj_id = self.storages[storage_name].add(content_data) - self.cursor.execute(""" INSERT INTO content (sha1) - VALUES (%s) - """, (obj_id,)) - return obj_id - - def _update_status(self, obj_id, storage_name, status, date=None): - """ Update the db status for the given id/storage_name. - - This does not create the content in the storage. - """ - self.cursor.execute("""insert into archive (name) - values (%s) - on conflict do nothing""", (storage_name,)) - - self.archiver.archiver_storage.content_archive_update( - obj_id, storage_name, status - ) - - # Integration test - @istest - def archive_missing_content(self): - """ Run archiver on a missing content should archive it. - """ - obj_data = b'archive_missing_content' - obj_id = self._add_content('uffizi', obj_data) - self._update_status(obj_id, 'uffizi', 'present') - # Content is missing on banco (entry not present in the db) - try: - self.dest_storage.get(obj_id) - except ObjNotFoundError: - pass - else: - self.fail('Content should not be present before archival') - self.archiver.run() - # now the content should be present on remote objstorage - remote_data = self.dest_storage.get(obj_id) - self.assertEquals(obj_data, remote_data) - - @istest - def archive_present_content(self): - """ A content that is not 'missing' shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_present_content') - self._update_status(obj_id, 'uffizi', 'present') - self._update_status(obj_id, 'banco', 'present') - # After the run, the content should NOT be in the archive. - # As the archiver believe it was already in. - self.archiver.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - @istest - def archive_already_enough(self): - """ A content missing with enough copies shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self._override_director_config(retention_policy=1) - director = self._create_director() - # Obj is present in only one archive but only one copy is required. - director.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - @istest - def content_archive_get_copies(self): - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [], - ) - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [(obj_id, ['uffizi'], {})], - ) - - # Unit tests for archive worker - - def archival_elapsed(self, mtime): - return self._create_worker()._is_archival_delay_elapsed(mtime) - - @istest - def vstatus_ongoing_remaining(self): - self.assertFalse(self.archival_elapsed(utcnow())) - - @istest - def vstatus_ongoing_elapsed(self): - past_time = (utcnow() - - datetime.timedelta( - seconds=self._create_worker().archival_max_age)) - self.assertTrue(self.archival_elapsed(past_time)) - - @istest - def need_archival_missing(self): - """ A content should need archival when it is missing. - """ - status_copies = {'present': ['uffizi'], 'missing': ['banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - True) - - @istest - def need_archival_present(self): - """ A content present everywhere shouldn't need archival - """ - status_copies = {'present': ['uffizi', 'banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - False) - - def _compute_copies_status(self, status): - """ A content with a given status should be detected correctly - """ - obj_id = self._add_content( - 'banco', b'compute_copies_' + bytes(status, 'utf8')) - self._update_status(obj_id, 'banco', status) - worker = self._create_worker() - self.assertIn('banco', worker.compute_copies( - set(worker.objstorages), obj_id)[status]) - - @istest - def compute_copies_present(self): - """ A present content should be detected with correct status - """ - self._compute_copies_status('present') - - @istest - def compute_copies_missing(self): - """ A missing content should be detected with correct status - """ - self._compute_copies_status('missing') - - @istest - def compute_copies_extra_archive(self): - obj_id = self._add_content('banco', b'foobar') - self._update_status(obj_id, 'banco', 'present') - self._update_status(obj_id, 'random_archive', 'present') - worker = self._create_worker() - copies = worker.compute_copies(set(worker.objstorages), obj_id) - self.assertEqual(copies['present'], {'banco'}) - self.assertEqual(copies['missing'], {'uffizi'}) - - def _get_backups(self, present, missing): - """ Return a list of the pair src/dest from the present and missing - """ - worker = self._create_worker() - return list(worker.choose_backup_servers(present, missing)) - - @istest - def choose_backup_servers(self): - self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) - self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) - # Even with more possible destinations, do not take more than the - # retention_policy require - self.assertEqual( - len(self._get_backups(['uffizi'], ['banco', 's3'])), - 1 - ) - - -class TestArchiverStorageStub(unittest.TestCase): - def setUp(self): - self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') - self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') - self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') - - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - # Create destination storage - dest_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.dest_storage = get_objstorage(**dest_config) - - self.config = { - 'cls': 'stub', - 'args': { - 'archives': { - 'present_archive': 'http://uffizi:5003', - 'missing_archive': 'http://banco:5003', - }, - 'present': ['present_archive'], - 'missing': ['missing_archive'], - 'logfile_base': os.path.join(self.log_root, 'log_'), - } - } - - # Generated with: - # - # id_length = 20 - # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') - # - self.content_ids = [ - b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", - b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', - b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', - b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', - b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', - ] - - self.archiver_storage = get_archiver_storage(**self.config) - super().setUp() - - def tearDown(self): - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - shutil.rmtree(self.log_root) - super().tearDown() - - @istest - def archive_ls(self): - self.assertCountEqual( - self.archiver_storage.archive_ls(), - self.config['args']['archives'].items() - ) - - @istest - def content_archive_get(self): - for content_id in self.content_ids: - self.assertEqual( - self.archiver_storage.content_archive_get(content_id), - (content_id, set(self.config['args']['present']), {}), - ) - - @istest - def content_archive_get_copies(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_copies(), - [], - ) - - @istest - def content_archive_get_unarchived_copies(self): - retention_policy = 2 - self.assertCountEqual( - self.archiver_storage.content_archive_get_unarchived_copies( - retention_policy), - [], - ) - - @istest - def content_archive_get_missing(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'missing_archive' - ), - self.content_ids, - ) - - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'present_archive' - ), - [], - ) - - with self.assertRaises(ValueError): - list(self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'unknown_archive' - )) - - @istest - def content_archive_get_unknown(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_unknown( - self.content_ids, - ), - [], - ) - - @istest - def content_archive_update(self): - for content_id in self.content_ids: - self.archiver_storage.content_archive_update( - content_id, 'present_archive', 'present') - self.archiver_storage.content_archive_update( - content_id, 'missing_archive', 'present') - - self.archiver_storage.close_logfile() - - # Make sure we created a logfile - files = glob.glob('%s*' % self.config['args']['logfile_base']) - self.assertEqual(len(files), 1) - - # make sure the logfile contains all our lines - lines = open(files[0]).readlines() - self.assertEqual(len(lines), 2 * len(self.content_ids))