Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123175
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
91 KB
Subscribers
None
View Options
diff --git a/debian/control b/debian/control
index 18b99f50..d3dfcc1f 100644
--- a/debian/control
+++ b/debian/control
@@ -1,57 +1,48 @@
Source: swh-storage
Maintainer: Software Heritage developers <swh-devel@inria.fr>
Section: python
Priority: optional
Build-Depends: debhelper (>= 9),
dh-python,
python3-all,
python3-click,
python3-dateutil,
python3-flask,
python3-nose,
python3-psycopg2,
python3-requests,
python3-setuptools,
python3-swh.core (>= 0.0.28~),
python3-swh.model (>= 0.0.15~),
python3-swh.objstorage (>= 0.0.17~),
python3-swh.scheduler (>= 0.0.14~),
python3-aiohttp,
python3-vcversioner
Standards-Version: 3.9.6
Homepage: https://forge.softwareheritage.org/diffusion/DSTO/
Package: python3-swh.storage
Architecture: all
Depends: python3-swh.core (>= 0.0.28~),
python3-swh.model (>= 0.0.15~),
python3-swh.objstorage (>= 0.0.17~),
${misc:Depends},
${python3:Depends}
Description: Software Heritage storage utilities
Package: python3-swh.storage.listener
Architecture: all
Depends: python3-swh.journal (>= 0.0.2~),
python3-kafka (>= 1.3.1~),
python3-swh.storage (= ${binary:Version}),
${misc:Depends},
${python3:Depends}
Description: Software Heritage storage listener
-Package: python3-swh.storage.archiver
-Architecture: all
-Depends: python3-swh.scheduler (>= 0.0.14~),
- python3-swh.journal,
- python3-swh.storage (= ${binary:Version}),
- ${misc:Depends},
- ${python3:Depends}
-Description: Software Heritage storage Archiver
-
Package: python3-swh.storage.provenance
Architecture: all
Depends: python3-swh.scheduler (>= 0.0.14~),
python3-swh.storage (= ${binary:Version}),
${misc:Depends},
${python3:Depends}
Description: Software Heritage storage Provenance
diff --git a/debian/rules b/debian/rules
index 5dbaab1b..cf9189bd 100755
--- a/debian/rules
+++ b/debian/rules
@@ -1,26 +1,23 @@
#!/usr/bin/make -f
export PYBUILD_NAME=swh.storage
%:
dh $@ --with python3 --buildsystem=pybuild
override_dh_install:
dh_install
for pyvers in $(shell py3versions -vr); do \
mkdir -p $(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \
mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/listener.py \
$(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \
- mkdir -p $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver ; \
- mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/* \
- $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/ ; \
mkdir -p $(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance ; \
mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/* \
$(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/ ; \
done
override_dh_auto_test:
PYBUILD_SYSTEM=custom \
PYBUILD_TEST_ARGS="cd {build_dir}; python{version} -m nose swh -sva '!db'" \
dh_auto_test
diff --git a/sql/archiver/Makefile b/sql/archiver/Makefile
deleted file mode 100644
index c132dbcc..00000000
--- a/sql/archiver/Makefile
+++ /dev/null
@@ -1,42 +0,0 @@
-# Depends: postgresql-client, postgresql-autodoc
-
-DBNAME = softwareheritage-archiver-dev
-DOCDIR = autodoc
-
-SQL_INIT = ../swh-init.sql
-SQL_SCHEMA = swh-archiver-schema.sql
-SQL_FUNC = swh-archiver-func.sql
-SQL_DATA = swh-archiver-data.sql
-SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA)
-
-PSQL_BIN = psql
-PSQL_FLAGS = --single-transaction --echo-all -X
-PSQL = $(PSQL_BIN) $(PSQL_FLAGS)
-
-
-all:
-
-createdb: createdb-stamp
-createdb-stamp: $(SQL_INIT)
- createdb $(DBNAME)
- touch $@
-
-filldb: filldb-stamp
-filldb-stamp: createdb-stamp
- cat $(SQLS) | $(PSQL) $(DBNAME)
- touch $@
-
-dropdb:
- -dropdb $(DBNAME)
-
-dumpdb: swh-archiver.dump
-swh.dump: filldb-stamp
- pg_dump -Fc $(DBNAME) > $@
-
-clean:
- rm -rf *-stamp $(DOCDIR)/
-
-distclean: clean dropdb
- rm -f swh.dump
-
-.PHONY: all initdb createdb dropdb doc clean
diff --git a/sql/archiver/swh-archiver-data.sql b/sql/archiver/swh-archiver-data.sql
deleted file mode 100644
index e4a70a25..00000000
--- a/sql/archiver/swh-archiver-data.sql
+++ /dev/null
@@ -1,3 +0,0 @@
-INSERT INTO archive(name) VALUES('uffizi');
-INSERT INTO archive(name) VALUES('banco');
-INSERT INTO archive(name) VALUES('azure');
diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql
deleted file mode 100644
index 750bfae2..00000000
--- a/sql/archiver/swh-archiver-func.sql
+++ /dev/null
@@ -1,40 +0,0 @@
-create or replace function swh_mktemp_content()
- returns void
- language plpgsql
-as $$
- begin
- create temporary table tmp_content (
- sha1 sha1 not null
- ) on commit drop;
- return;
- end
-$$;
-
-create or replace function swh_content_copies_from_temp(archive_names text[])
- returns void
- language plpgsql
-as $$
- begin
- with existing_content_ids as (
- select id
- from content
- inner join tmp_content on content.sha1 = tmp.sha1
- ), created_content_ids as (
- insert into content (sha1)
- select sha1 from tmp_content
- on conflict do nothing
- returning id
- ), content_ids as (
- select * from existing_content_ids
- union all
- select * from created_content_ids
- ), archive_ids as (
- select id from archive
- where name = any(archive_names)
- ) insert into content_copies (content_id, archive_id, mtime, status)
- select content_ids.id, archive_ids.id, now(), 'present'
- from content_ids cross join archive_ids
- on conflict (content_id, archive_id) do update
- set mtime = excluded.mtime, status = excluded.status;
- end
-$$;
diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql
deleted file mode 100644
index e5849585..00000000
--- a/sql/archiver/swh-archiver-schema.sql
+++ /dev/null
@@ -1,63 +0,0 @@
--- In order to archive the content of the object storage, add
--- some tables to keep trace of what have already been archived.
-
-create table dbversion
-(
- version int primary key,
- release timestamptz,
- description text
-);
-
-comment on table dbversion is 'Schema update tracking';
-
-INSERT INTO dbversion(version, release, description)
-VALUES(10, now(), 'Work In Progress');
-
-CREATE TABLE archive (
- id bigserial PRIMARY KEY,
- name text not null
-);
-
-create unique index on archive(name);
-
-comment on table archive is 'The archives in which contents are stored';
-comment on column archive.id is 'Short identifier for archives';
-comment on column archive.name is 'Name of the archive';
-
-CREATE TYPE archive_status AS ENUM (
- 'missing',
- 'ongoing',
- 'present',
- 'corrupted'
-);
-
-comment on type archive_status is 'Status of a given copy of a content';
-
--- a SHA1 checksum (not necessarily originating from Git)
-CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20);
-
--- a bucket for which we count items
-CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2);
-
-create table content (
- id bigserial primary key,
- sha1 sha1 not null
-);
-
-comment on table content is 'All the contents being archived by Software Heritage';
-comment on column content.id is 'Short id for the content being archived';
-comment on column content.sha1 is 'SHA1 hash of the content being archived';
-
-create unique index on content(sha1);
-
-create table content_copies (
- content_id bigint not null, -- references content(id)
- archive_id bigint not null, -- references archive(id)
- mtime timestamptz,
- status archive_status not null,
- primary key (content_id, archive_id)
-);
-
-comment on table content_copies is 'Tracking of all content copies in the archives';
-comment on column content_copies.mtime is 'Last update time of the copy';
-comment on column content_copies.status is 'Status of the copy';
diff --git a/sql/archiver/upgrades/002.sql b/sql/archiver/upgrades/002.sql
deleted file mode 100644
index d83db028..00000000
--- a/sql/archiver/upgrades/002.sql
+++ /dev/null
@@ -1,9 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 1
--- to_version: 2
--- description: Add a 'corrupted' status into the archive_content status
-
-INSERT INTO dbversion(version, release, description)
-VALUES(2, now(), 'Work In Progress');
-
-ALTER TYPE archive_status ADD VALUE 'corrupted';
diff --git a/sql/archiver/upgrades/003.sql b/sql/archiver/upgrades/003.sql
deleted file mode 100644
index ba43f526..00000000
--- a/sql/archiver/upgrades/003.sql
+++ /dev/null
@@ -1,25 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 2
--- to_version: 3
--- description: Add a 'num_present' cache column into the archive_content status
-
-INSERT INTO dbversion(version, release, description)
-VALUES(3, now(), 'Work In Progress');
-
-alter table content_archive add column num_present int default null;
-comment on column content_archive.num_present is 'Number of copies marked as present (cache updated via trigger)';
-
-create index concurrently on content_archive(num_present);
-
--- Keep the num_copies cache updated
-CREATE FUNCTION update_num_present() RETURNS TRIGGER AS $$
- BEGIN
- NEW.num_present := (select count(*) from jsonb_each(NEW.copies) where value->>'status' = 'present');
- RETURN new;
- END;
-$$ LANGUAGE PLPGSQL;
-
-CREATE TRIGGER update_num_present
- BEFORE INSERT OR UPDATE OF copies ON content_archive
- FOR EACH ROW
- EXECUTE PROCEDURE update_num_present();
diff --git a/sql/archiver/upgrades/004.sql b/sql/archiver/upgrades/004.sql
deleted file mode 100644
index bfb5ad31..00000000
--- a/sql/archiver/upgrades/004.sql
+++ /dev/null
@@ -1,44 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 3
--- to_version: 4
--- description: Add azure instance
-
-INSERT INTO dbversion(version, release, description)
-VALUES(4, now(), 'Work In Progress');
-
-ALTER TABLE archive DROP COLUMN url;
-ALTER TABLE archive ALTER COLUMN id SET DATA TYPE TEXT;
-
-INSERT INTO archive(id) VALUES ('azure');
-
-create or replace function swh_mktemp_content_archive()
- returns void
- language sql
-as $$
- create temporary table tmp_content_archive (
- like content_archive including defaults
- ) on commit drop;
- alter table tmp_content_archive drop column copies;
- alter table tmp_content_archive drop column num_present;
-$$;
-
-COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive';
-
-create or replace function swh_content_archive_missing(backend_name text)
- returns setof sha1
- language plpgsql
-as $$
-begin
- return query
- select content_id
- from tmp_content_archive tmp where exists (
- select 1
- from content_archive c
- where tmp.content_id = c.content_id
- and (not c.copies ? backend_name
- or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb))
- );
-end
-$$;
-
-COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend';
diff --git a/sql/archiver/upgrades/005.sql b/sql/archiver/upgrades/005.sql
deleted file mode 100644
index bc50631c..00000000
--- a/sql/archiver/upgrades/005.sql
+++ /dev/null
@@ -1,24 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 4
--- to_version: 5
--- description: List unknown sha1s from content_archive
-
-INSERT INTO dbversion(version, release, description)
-VALUES(5, now(), 'Work In Progress');
-
-create or replace function swh_content_archive_unknown()
- returns setof sha1
- language plpgsql
-as $$
-begin
- return query
- select content_id
- from tmp_content_archive tmp where not exists (
- select 1
- from content_archive c
- where tmp.content_id = c.content_id
- );
-end
-$$;
-
-COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1';
diff --git a/sql/archiver/upgrades/006.sql b/sql/archiver/upgrades/006.sql
deleted file mode 100644
index d9d1b24c..00000000
--- a/sql/archiver/upgrades/006.sql
+++ /dev/null
@@ -1,100 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 5
--- to_version: 6
--- description: Create a bucketed count of contents in the archive.
-
-INSERT INTO dbversion(version, release, description)
-VALUES(6, now(), 'Work In Progress');
-
--- a bucket for which we count items
-CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2);
-
-CREATE TABLE content_archive_counts (
- archive text not null references archive(id),
- bucket bucket not null,
- count bigint,
- primary key (archive, bucket)
-);
-
-comment on table content_archive_counts is 'Bucketed count of archive contents';
-comment on column content_archive_counts.archive is 'the archive for which we''re counting';
-comment on column content_archive_counts.bucket is 'the bucket of items we''re counting';
-comment on column content_archive_counts.count is 'the number of items counted in the given bucket';
-
-
-CREATE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$
- with sample as (
- select content_id, copies from content_archive
- where content_id > from_id and content_id <= to_id
- ), data as (
- select substring(content_id from 19) as bucket, jbe.key as archive
- from sample
- join lateral jsonb_each(copies) jbe on true
- where jbe.value->>'status' = 'present'
- ), bucketed as (
- select bucket, archive, count(*) as count
- from data
- group by bucket, archive
- ) update content_archive_counts cac set
- count = cac.count + bucketed.count
- from bucketed
- where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket;
-$$;
-
-comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts';
-
-CREATE FUNCTION init_content_archive_counts() returns void language sql as $$
- insert into content_archive_counts (
- select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count
- from archive join lateral generate_series(0, 65535) bucket on true
- ) on conflict (archive, bucket) do nothing;
-$$;
-
-comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives';
-
--- keep the content_archive_counts updated
-CREATE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
- DECLARE
- content_id sha1;
- content_bucket bucket;
- copies record;
- old_row content_archive;
- new_row content_archive;
- BEGIN
- -- default values for old or new row depending on trigger type
- if tg_op = 'INSERT' then
- old_row := (null::sha1, '{}'::jsonb, 0);
- else
- old_row := old;
- end if;
- if tg_op = 'DELETE' then
- new_row := (null::sha1, '{}'::jsonb, 0);
- else
- new_row := new;
- end if;
-
- -- get the content bucket
- content_id := coalesce(old_row.content_id, new_row.content_id);
- content_bucket := substring(content_id from 19)::bucket;
-
- -- compare copies present in old and new row for each archive type
- FOR copies IN
- select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status
- from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key
- LOOP
- -- the count didn't change
- CONTINUE WHEN copies.old_status is distinct from copies.new_status OR
- (copies.old_status != 'present' AND copies.new_status != 'present');
-
- update content_archive_counts cac
- set count = count + (case when copies.old_status = 'present' then -1 else 1 end)
- where archive = copies.archive and bucket = content_bucket;
- END LOOP;
- return null;
- END;
-$$;
-
-create trigger update_content_archive_counts
- AFTER INSERT OR UPDATE OR DELETE ON content_archive
- FOR EACH ROW
- EXECUTE PROCEDURE update_content_archive_counts();
diff --git a/sql/archiver/upgrades/007.sql b/sql/archiver/upgrades/007.sql
deleted file mode 100644
index 34049426..00000000
--- a/sql/archiver/upgrades/007.sql
+++ /dev/null
@@ -1,21 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 6
--- to_version: 7
--- description: Add a function to compute archive counts
-
-INSERT INTO dbversion(version, release, description)
-VALUES(7, now(), 'Work In Progress');
-
-create type content_archive_count as (
- archive text,
- count bigint
-);
-
-create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$
- select archive, sum(count)::bigint
- from content_archive_counts
- group by archive
- order by archive;
-$$;
-
-comment on function get_content_archive_counts() is 'Get count for each archive';
diff --git a/sql/archiver/upgrades/008.sql b/sql/archiver/upgrades/008.sql
deleted file mode 100644
index 6527aca6..00000000
--- a/sql/archiver/upgrades/008.sql
+++ /dev/null
@@ -1,49 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 7
--- to_version: 8
--- description: Fix silly bug in update_content_archive_counts
-
-INSERT INTO dbversion(version, release, description)
-VALUES(8, now(), 'Work In Progress');
-
--- keep the content_archive_counts updated
-CREATE OR REPLACE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
- DECLARE
- content_id sha1;
- content_bucket bucket;
- copies record;
- old_row content_archive;
- new_row content_archive;
- BEGIN
- -- default values for old or new row depending on trigger type
- if tg_op = 'INSERT' then
- old_row := (null::sha1, '{}'::jsonb, 0);
- else
- old_row := old;
- end if;
- if tg_op = 'DELETE' then
- new_row := (null::sha1, '{}'::jsonb, 0);
- else
- new_row := new;
- end if;
-
- -- get the content bucket
- content_id := coalesce(old_row.content_id, new_row.content_id);
- content_bucket := substring(content_id from 19)::bucket;
-
- -- compare copies present in old and new row for each archive type
- FOR copies IN
- select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status
- from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key
- LOOP
- -- the count didn't change
- CONTINUE WHEN copies.old_status is not distinct from copies.new_status OR
- (copies.old_status != 'present' AND copies.new_status != 'present');
-
- update content_archive_counts cac
- set count = count + (case when copies.old_status = 'present' then -1 else 1 end)
- where archive = copies.archive and bucket = content_bucket;
- END LOOP;
- return null;
- END;
-$$;
diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql
deleted file mode 100644
index 5a3133ba..00000000
--- a/sql/archiver/upgrades/009.sql
+++ /dev/null
@@ -1,42 +0,0 @@
--- SWH Archiver DB schema upgrade
--- from_version: 8
--- to_version: 9
--- description: Add helper functions to create temporary table and insert new entries in content_archive table
-
-insert into dbversion(version, release, description)
-values(9, now(), 'Work In Progress');
-
--- create a temporary table called tmp_TBLNAME, mimicking existing
--- table TBLNAME
-create or replace function swh_mktemp(tblname regclass)
- returns void
- language plpgsql
-as $$
-begin
- execute format('
- create temporary table tmp_%1$I
- (like %1$I including defaults)
- on commit drop;
- ', tblname);
- return;
-end
-$$;
-
-comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one';
-
--- Helper function to insert new entries in content_archive from a
--- temporary table skipping duplicates.
-create or replace function swh_content_archive_add()
- returns void
- language plpgsql
-as $$
-begin
- insert into content_archive (content_id, copies, num_present)
- select distinct content_id, copies, num_present
- from tmp_content_archive
- on conflict(content_id) do nothing;
- return;
-end
-$$;
-
-comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive';
diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py
deleted file mode 100644
index 2ff1cce1..00000000
--- a/swh/storage/archiver/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from .director import ArchiverWithRetentionPolicyDirector # NOQA
-from .director import ArchiverStdinToBackendDirector # NOQA
-from .worker import ArchiverWithRetentionPolicyWorker # NOQA
-from .worker import ArchiverToBackendWorker # NOQA
-from .copier import ArchiverCopier # NOQA
diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py
deleted file mode 100644
index 1832e2bd..00000000
--- a/swh/storage/archiver/copier.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-import logging
-
-from swh.objstorage.exc import ObjNotFoundError
-from swh.model import hashutil
-
-logger = logging.getLogger('archiver.worker.copier')
-
-
-class ArchiverCopier():
- """ This archiver copy some files into a remote objstorage
- in order to get a backup.
- """
- def __init__(self, source, destination, content_ids):
- """ Create a Copier for the archiver
-
- Args:
- source (ObjStorage): source storage to get the contents.
- destination (ObjStorage): Storage where the contents will
- be copied.
- content_ids: list of content's id to archive.
- """
- self.source = source
- self.destination = destination
- self.content_ids = content_ids
-
- def run(self):
- """ Do the copy on the backup storage.
-
- Run the archiver copier in order to copy the required content
- into the current destination.
- The content which corresponds to the sha1 in self.content_ids
- will be fetched from the master_storage and then copied into
- the backup object storage.
-
- Returns:
- A boolean that indicates if the whole content have been copied.
- """
- try:
- for content_id in self.content_ids:
- try:
- content = self.source.get(content_id)
- except ObjNotFoundError:
- logging.error('content %s not found' %
- hashutil.hash_to_hex(content_id))
- continue
- self.destination.add(content, content_id)
- except Exception as e:
- logger.exception('Problem during copy: %s' % e)
- return False
- return True
diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py
deleted file mode 100644
index d9e5e439..00000000
--- a/swh/storage/archiver/db.py
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-
-from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure
-
-
-def utcnow():
- return datetime.datetime.now(tz=datetime.timezone.utc)
-
-
-class ArchiverDb(BaseDb):
- """Proxy to the SWH's archiver DB
-
- """
-
- def archive_ls(self, cur=None):
- """ Get all the archives registered on the server.
-
- Yields:
- a tuple (server_id, server_url) for each archive server.
- """
- cur = self._cursor(cur)
- cur.execute("SELECT * FROM archive")
- yield from cursor_to_bytes(cur)
-
- def content_archive_get(self, content_id, cur=None):
- """ Get the archival status of a content in a specific server.
-
- Retrieve from the database the archival status of the given content
- in the given archive server.
-
- Args:
- content_id: the sha1 of the content.
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
- """
- query = """select archive.name, status, mtime
- from content_copies
- left join archive on content_copies.archive_id = archive.id
- where content_copies.content_id = (
- select id from content where sha1 = %s)
- """
- cur = self._cursor(cur)
- cur.execute(query, (content_id,))
- rows = cur.fetchall()
- if not rows:
- return None
- present = []
- ongoing = {}
- for archive, status, mtime in rows:
- if status == 'present':
- present.append(archive)
- elif status == 'ongoing':
- ongoing[archive] = mtime
- return (content_id, present, ongoing)
-
- def content_archive_get_copies(self, last_content=None, limit=1000,
- cur=None):
- """Get the list of copies for `limit` contents starting after
- `last_content`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
-
- vars = {
- 'limit': limit,
- }
-
- if last_content is None:
- last_content_clause = 'true'
- else:
- last_content_clause = """content_id > (
- select id from content
- where sha1 = %(last_content)s)"""
- vars['last_content'] = last_content
-
- query = """select
- (select sha1 from content where id = content_id),
- array_agg((select name from archive
- where id = archive_id))
- from content_copies
- where status = 'present' and %s
- group by content_id
- order by content_id
- limit %%(limit)s""" % last_content_clause
-
- cur = self._cursor(cur)
- cur.execute(query, vars)
- for content_id, present in cursor_to_bytes(cur):
- yield (content_id, present, {})
-
- def content_archive_get_unarchived_copies(
- self, retention_policy, last_content=None,
- limit=1000, cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`. Yields only copies with number of present
- smaller than `retention policy`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- retention_policy: number of required present copies
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
-
- vars = {
- 'limit': limit,
- 'retention_policy': retention_policy,
- }
-
- if last_content is None:
- last_content_clause = 'true'
- else:
- last_content_clause = """content_id > (
- select id from content
- where sha1 = %(last_content)s)"""
- vars['last_content'] = last_content
-
- query = """select
- (select sha1 from content where id = content_id),
- array_agg((select name from archive
- where id = archive_id))
- from content_copies
- where status = 'present' and %s
- group by content_id
- having count(archive_id) < %%(retention_policy)s
- order by content_id
- limit %%(limit)s""" % last_content_clause
-
- cur = self._cursor(cur)
- cur.execute(query, vars)
- for content_id, present in cursor_to_bytes(cur):
- yield (content_id, present, {})
-
- @stored_procedure('swh_mktemp_content_archive')
- def mktemp_content_archive(self, cur=None):
- """Trigger the creation of the temporary table tmp_content_archive
- during the lifetime of the transaction.
-
- """
- pass
-
- @stored_procedure('swh_content_archive_add')
- def content_archive_add_from_temp(self, cur=None):
- """Add new content archive entries from temporary table.
-
- Use from archiver.storage module:
- self.db.mktemp_content_archive()
- # copy data over to the temp table
- self.db.copy_to([{'colname': id0}, {'colname': id1}],
- 'tmp_cache_content',
- ['colname'], cur)
- # insert into the main table
- self.db.add_content_archive_from_temp(cur)
-
- """
- pass
-
- def content_archive_get_missing(self, backend_name, cur=None):
- """Retrieve the content missing from backend_name.
-
- """
- cur = self._cursor(cur)
- cur.execute("select * from swh_content_archive_missing(%s)",
- (backend_name,))
- yield from cursor_to_bytes(cur)
-
- def content_archive_get_unknown(self, cur=None):
- """Retrieve unknown sha1 from archiver db.
-
- """
- cur = self._cursor(cur)
- cur.execute('select * from swh_content_archive_unknown()')
- yield from cursor_to_bytes(cur)
-
- def content_archive_update(self, content_id, archive_id,
- new_status=None, cur=None):
- """ Update the status of an archive content and set its mtime to
-
- Change the mtime of an archived content for the given archive and set
- it's mtime to the current time.
-
- Args:
- content_id (str): content sha1
- archive_id (str): name of the archive
- new_status (str): one of 'missing', 'present' or 'ongoing'.
- this status will replace the previous one. If not given,
- the function only change the mtime of the content for the
- given archive.
- """
- assert isinstance(content_id, bytes)
- assert new_status is not None
-
- query = """insert into content_copies (archive_id, content_id, status, mtime)
- values ((select id from archive where name=%s),
- (select id from content where sha1=%s),
- %s, %s)
- on conflict (archive_id, content_id) do
- update set status = excluded.status, mtime = excluded.mtime
- """
-
- cur = self._cursor(cur)
- cur.execute(query, (archive_id, content_id, new_status, utcnow()))
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
deleted file mode 100644
index 20b36a36..00000000
--- a/swh/storage/archiver/director.py
+++ /dev/null
@@ -1,339 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import abc
-import logging
-import sys
-import time
-
-import click
-
-from swh.core import config, utils
-from swh.model import hashutil
-from swh.objstorage import get_objstorage
-from swh.scheduler.utils import get_task
-
-from . import tasks # noqa
-from .storage import get_archiver_storage
-
-
-class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta):
- """Abstract Director class
-
- An archiver director is in charge of dispatching batch of
- contents to archiver workers (for them to archive).
-
- Inherit from this class and provide:
- - ADDITIONAL_CONFIG: Some added configuration needed for the
- director to work
- - CONFIG_BASE_FILENAME: relative path to lookup for the
- configuration file
- - def get_contents_to_archive(self): Implementation method to read
- contents to archive
-
- """
- DEFAULT_CONFIG = {
- 'batch_max_size': ('int', 1500),
- 'asynchronous': ('bool', True),
- 'max_queue_length': ('int', 100000),
- 'queue_throttling_delay': ('int', 120),
-
- 'archiver_storage': ('dict', {
- 'cls': 'db',
- 'args': {
- 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest',
- },
- }),
- }
-
- # Destined to be overridden by subclass
- ADDITIONAL_CONFIG = {}
-
- # We use the same configuration file as the worker
- CONFIG_BASE_FILENAME = 'archiver/worker'
-
- # The worker's task queue name to use
- TASK_NAME = None
-
- def __init__(self):
- """ Constructor of the archiver director.
-
- Args:
- db_conn_archiver: Either a libpq connection string,
- or a psycopg2 connection for the archiver db.
- config: optionnal additional configuration. Keys in the dict will
- override the one parsed from the configuration file.
- """
- super().__init__()
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
- self.archiver_storage = get_archiver_storage(
- **self.config['archiver_storage'])
- self.task = get_task(self.TASK_NAME)
- self.max_queue_length = self.config['max_queue_length']
- self.throttling_delay = self.config['queue_throttling_delay']
-
- def run(self):
- """ Run the archiver director.
-
- The archiver director will check all the contents of the archiver
- database and do the required backup jobs.
- """
- if self.config['asynchronous']:
- run_fn = self.run_async_worker
- else:
- run_fn = self.run_sync_worker
-
- for batch in self.read_batch_contents():
- run_fn(batch)
-
- def run_async_worker(self, batch):
- """Produce a worker that will be added to the task queue.
-
- """
- max_length = self.max_queue_length
- throttling_delay = self.throttling_delay
-
- while True:
- length = self.task.app.get_queue_length(self.task.task_queue)
- if length >= max_length:
- logging.info(
- 'queue length %s >= %s, throttling for %s seconds' % (
- length,
- max_length,
- throttling_delay,
- )
- )
- time.sleep(throttling_delay)
- else:
- break
-
- self.task.delay(batch=batch)
-
- def run_sync_worker(self, batch):
- """Run synchronously a worker on the given batch.
-
- """
- self.task(batch=batch)
-
- def read_batch_contents(self):
- """ Create batch of contents that needs to be archived
-
- Yields:
- batch of sha1 that corresponds to contents that needs more archive
- copies.
- """
- contents = []
- for content in self.get_contents_to_archive():
- contents.append(content)
- if len(contents) > self.config['batch_max_size']:
- yield contents
- contents = []
- if len(contents) > 0:
- yield contents
-
- @abc.abstractmethod
- def get_contents_to_archive(self):
- """Retrieve generator of sha1 to archive
-
- Yields:
- sha1 to archive
-
- """
- pass
-
-
-class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase):
- """Process the files in order to know which one is needed as backup.
-
- The archiver director processes the files in the local storage in order
- to know which one needs archival and it delegates this task to
- archiver workers.
- """
-
- ADDITIONAL_CONFIG = {
- 'retention_policy': ('int', 2),
- }
-
- TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask'
-
- def __init__(self, start_id):
- super().__init__()
- if start_id is not None:
- self.start_id = hashutil.hash_to_bytes(start_id)
- else:
- self.start_id = None
-
- def get_contents_to_archive(self):
- """Create batch of contents that needs to be archived
-
- Yields:
- Datas about a content as a tuple
- (content_id, present_copies, ongoing_copies) where ongoing_copies
- is a dict mapping copy to mtime.
-
- """
- last_content = self.start_id
- while True:
- archiver_contents = list(
- self.archiver_storage.content_archive_get_unarchived_copies(
- last_content=last_content,
- retention_policy=self.config['retention_policy'],
- limit=self.config['batch_max_size']))
- if not archiver_contents:
- return
- for content_id, _, _ in archiver_contents:
- last_content = content_id
- yield content_id
-
-
-def read_sha1_from_stdin():
- """Read sha1 from stdin.
-
- """
- for line in sys.stdin:
- sha1 = line.strip()
- try:
- yield hashutil.hash_to_bytes(sha1)
- except Exception:
- print("%s is not a valid sha1 hash, continuing" % repr(sha1),
- file=sys.stderr)
- continue
-
-
-class ArchiverStdinToBackendDirector(ArchiverDirectorBase):
- """A cloud archiver director in charge of reading contents and send
- them in batch in the cloud.
-
- The archiver director, in order:
- - Reads sha1 to send to a specific backend.
- - Checks if those sha1 are known in the archiver. If they are not,
- add them
- - if the sha1 are missing, they are sent for the worker to archive
-
- If the flag force_copy is set, this will force the copy to be sent
- for archive even though it has already been done.
-
- """
- ADDITIONAL_CONFIG = {
- 'destination': ('str', 'azure'),
- 'force_copy': ('bool', False),
- 'source': ('str', 'uffizi'),
- 'storages': ('list[dict]',
- [
- {'host': 'uffizi',
- 'cls': 'pathslicing',
- 'args': {'root': '/tmp/softwareheritage/objects',
- 'slicing': '0:2/2:4/4:6'}},
- {'host': 'banco',
- 'cls': 'remote',
- 'args': {'base_url': 'http://banco:5003/'}}
- ])
- }
-
- CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
-
- TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask'
-
- def __init__(self):
- super().__init__()
- self.destination = self.config['destination']
- self.force_copy = self.config['force_copy']
- self.objstorages = {
- storage['host']: get_objstorage(storage['cls'], storage['args'])
- for storage in self.config.get('storages', [])
- }
- # Fallback objstorage
- self.source = self.config['source']
-
- def _add_unknown_content_ids(self, content_ids):
- """Check whether some content_id are unknown.
- If they are, add them to the archiver db.
-
- Args:
- content_ids: List of dict with one key content_id
-
- """
- source_objstorage = self.objstorages[self.source]
-
- self.archiver_storage.content_archive_add(
- (h
- for h in content_ids
- if h in source_objstorage),
- sources_present=[self.source])
-
- def get_contents_to_archive(self):
- gen_content_ids = (
- ids for ids in utils.grouper(read_sha1_from_stdin(),
- self.config['batch_max_size']))
-
- if self.force_copy:
- for content_ids in gen_content_ids:
- content_ids = list(content_ids)
-
- if not content_ids:
- continue
-
- # Add missing entries in archiver table
- self._add_unknown_content_ids(content_ids)
-
- print('Send %s contents to archive' % len(content_ids))
-
- for content_id in content_ids:
- # force its status to missing
- self.archiver_storage.content_archive_update(
- content_id, self.destination, 'missing')
- yield content_id
-
- else:
- for content_ids in gen_content_ids:
- content_ids = list(content_ids)
-
- # Add missing entries in archiver table
- self._add_unknown_content_ids(content_ids)
-
- # Filter already copied data
- content_ids = list(
- self.archiver_storage.content_archive_get_missing(
- content_ids=content_ids,
- backend_name=self.destination))
-
- if not content_ids:
- continue
-
- print('Send %s contents to archive' % len(content_ids))
-
- for content in content_ids:
- yield content
-
- def run_async_worker(self, batch):
- """Produce a worker that will be added to the task queue.
-
- """
- self.task.delay(destination=self.destination, batch=batch)
-
- def run_sync_worker(self, batch):
- """Run synchronously a worker on the given batch.
-
- """
- self.task(destination=self.destination, batch=batch)
-
-
-@click.command()
-@click.option('--direct', is_flag=True,
- help="""The archiver sends content for backup to
-one storage.""")
-@click.option('--start-id', default=None, help="The first id to process")
-def launch(direct, start_id):
- if direct:
- archiver = ArchiverStdinToBackendDirector()
- else:
- archiver = ArchiverWithRetentionPolicyDirector(start_id)
-
- archiver.run()
-
-
-if __name__ == '__main__':
- launch()
diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py
deleted file mode 100644
index 97710582..00000000
--- a/swh/storage/archiver/storage.py
+++ /dev/null
@@ -1,361 +0,0 @@
-# Copyright (C) 2016-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import json
-import os
-import psycopg2
-import time
-
-from .db import ArchiverDb
-
-from swh.model import hashutil
-from swh.storage.common import db_transaction_generator, db_transaction
-from swh.storage.exc import StorageDBError
-
-
-class ArchiverStorage():
- """SWH Archiver storage proxy, encompassing DB
-
- """
- def __init__(self, dbconn):
- """
- Args:
- db_conn: either a libpq connection string, or a psycopg2 connection
-
- """
- try:
- if isinstance(dbconn, psycopg2.extensions.connection):
- self.db = ArchiverDb(dbconn)
- else:
- self.db = ArchiverDb.connect(dbconn)
- except psycopg2.OperationalError as e:
- raise StorageDBError(e)
-
- @db_transaction_generator
- def archive_ls(self, cur=None):
- """ Get all the archives registered on the server.
-
- Yields:
- a tuple (server_id, server_url) for each archive server.
- """
- yield from self.db.archive_ls(cur)
-
- @db_transaction
- def content_archive_get(self, content_id, cur=None):
- """ Get the archival status of a content.
-
- Retrieve from the database the archival status of the given content
-
- Args:
- content_id: the sha1 of the content
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
- """
- return self.db.content_archive_get(content_id, cur)
-
- @db_transaction_generator
- def content_archive_get_copies(self, last_content=None, limit=1000,
- cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from self.db.content_archive_get_copies(last_content, limit,
- cur)
-
- @db_transaction_generator
- def content_archive_get_unarchived_copies(
- self, retention_policy, last_content=None,
- limit=1000, cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`. Yields only copies with number of present
- smaller than `retention policy`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- retention_policy: number of required present copies
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from self.db.content_archive_get_unarchived_copies(
- retention_policy, last_content, limit, cur)
-
- @db_transaction_generator
- def content_archive_get_missing(self, content_ids, backend_name, cur=None):
- """Retrieve missing sha1s from source_name.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
- source_name (str): Name of the backend to check for content
-
- Yields:
- missing sha1s from backend_name
-
- """
- db = self.db
-
- db.mktemp_content_archive()
-
- db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur)
-
- for content_id in db.content_archive_get_missing(backend_name, cur):
- yield content_id[0]
-
- @db_transaction_generator
- def content_archive_get_unknown(self, content_ids, cur=None):
- """Retrieve unknown sha1s from content_archive.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
-
- Yields:
- Unknown sha1s from content_archive
-
- """
- db = self.db
-
- db.mktemp_content_archive()
-
- db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur)
-
- for content_id in db.content_archive_get_unknown(cur):
- yield content_id[0]
-
- @db_transaction
- def content_archive_update(self, content_id, archive_id,
- new_status=None, cur=None):
- """ Update the status of an archive content and set its mtime to now
-
- Change the mtime of an archived content for the given archive and set
- it's mtime to the current time.
-
- Args:
- content_id (str): content sha1
- archive_id (str): name of the archive
- new_status (str): one of 'missing', 'present' or 'ongoing'.
- this status will replace the previous one. If not given,
- the function only change the mtime of the content for the
- given archive.
- """
- self.db.content_archive_update(content_id, archive_id, new_status, cur)
-
- @db_transaction
- def content_archive_add(
- self, content_ids, sources_present, cur=None):
- """Insert a new entry in db about content_id.
-
- Args:
- content_ids ([bytes|str]): content identifiers
- sources_present ([str]): List of source names where
- contents are present
- """
- db = self.db
-
- # Prepare copies dictionary
- copies = {}
- for source in sources_present:
- copies[source] = {
- "status": "present",
- "mtime": int(time.time()),
- }
-
- copies = json.dumps(copies)
- num_present = len(sources_present)
-
- db.mktemp('content_archive')
- db.copy_to(
- ({'content_id': id,
- 'copies': copies,
- 'num_present': num_present}
- for id in content_ids),
- 'tmp_content_archive',
- ['content_id', 'copies', 'num_present'],
- cur)
- db.content_archive_add_from_temp(cur)
-
-
-class StubArchiverStorage():
- def __init__(self, archives, present, missing, logfile_base):
- """
- A stub storage for the archiver that doesn't write to disk
-
- Args:
- - archives: a dictionary mapping archive names to archive URLs
- - present: archives where the objects are all considered present
- - missing: archives where the objects are all considered missing
- - logfile_base: basename for the logfile
- """
- self.archives = archives
- self.present = set(present)
- self.missing = set(missing)
- if set(archives) != self.present | self.missing:
- raise ValueError("Present and missing archives don't match")
- self.logfile_base = logfile_base
- self.__logfile = None
-
- def open_logfile(self):
- if self.__logfile:
- return
-
- logfile_name = "%s.%d" % (self.logfile_base, os.getpid())
- self.__logfile = open(logfile_name, 'a')
-
- def close_logfile(self):
- if not self.__logfile:
- return
-
- self.__logfile.close()
- self.__logfile = None
-
- def archive_ls(self, cur=None):
- """ Get all the archives registered on the server.
-
- Yields:
- a tuple (server_id, server_url) for each archive server.
- """
- yield from self.archives.items()
-
- def content_archive_get(self, content_id, cur=None):
- """ Get the archival status of a content.
-
- Retrieve from the database the archival status of the given content
-
- Args:
- content_id: the sha1 of the content
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
- """
- return (content_id, self.present, {})
-
- def content_archive_get_copies(self, last_content=None, limit=1000,
- cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from []
-
- def content_archive_get_unarchived_copies(self, retention_policy,
- last_content=None, limit=1000,
- cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`. Yields only copies with number of present
- smaller than `retention policy`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- retention_policy: number of required present copies
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from []
-
- def content_archive_get_missing(self, content_ids, backend_name, cur=None):
- """Retrieve missing sha1s from source_name.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
- source_name (str): Name of the backend to check for content
-
- Yields:
- missing sha1s from backend_name
-
- """
- if backend_name in self.missing:
- yield from content_ids
- elif backend_name in self.present:
- yield from []
- else:
- raise ValueError('Unknown backend `%s`' % backend_name)
-
- def content_archive_get_unknown(self, content_ids, cur=None):
- """Retrieve unknown sha1s from content_archive.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
-
- Yields:
- Unknown sha1s from content_archive
-
- """
- yield from []
-
- def content_archive_update(self, content_id, archive_id,
- new_status=None, cur=None):
- """ Update the status of an archive content and set its mtime to now
-
- Change the mtime of an archived content for the given archive and set
- it's mtime to the current time.
-
- Args:
- content_id (str): content sha1
- archive_id (str): name of the archive
- new_status (str): one of 'missing', 'present' or 'ongoing'.
- this status will replace the previous one. If not given,
- the function only change the mtime of the content for the
- given archive.
- """
- if not self.__logfile:
- self.open_logfile()
-
- print(time.time(), archive_id, new_status,
- hashutil.hash_to_hex(content_id), file=self.__logfile)
-
- def content_archive_add(
- self, content_ids, sources_present, cur=None):
- """Insert a new entry in db about content_id.
-
- Args:
- content_ids ([bytes|str]): content identifiers
- sources_present ([str]): List of source names where
- contents are present
- """
- pass
-
-
-def get_archiver_storage(cls, args):
- """Instantiate an archiver database with the proper class and arguments"""
- if cls == 'db':
- return ArchiverStorage(**args)
- elif cls == 'stub':
- return StubArchiverStorage(**args)
- else:
- raise ValueError('Unknown Archiver Storage class `%s`' % cls)
diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py
deleted file mode 100644
index ccb0a2f6..00000000
--- a/swh/storage/archiver/tasks.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from swh.scheduler.task import Task
-from .worker import ArchiverWithRetentionPolicyWorker
-from .worker import ArchiverToBackendWorker
-
-
-class SWHArchiverWithRetentionPolicyTask(Task):
- """Main task that archive a batch of content.
-
- """
- task_queue = 'swh_storage_archive_worker'
-
- def run_task(self, *args, **kwargs):
- ArchiverWithRetentionPolicyWorker(*args, **kwargs).run()
-
-
-class SWHArchiverToBackendTask(Task):
- """Main task that archive a batch of content in the cloud.
-
- """
- task_queue = 'swh_storage_archive_worker_to_backend'
-
- def run_task(self, *args, **kwargs):
- ArchiverToBackendWorker(*args, **kwargs).run()
diff --git a/swh/storage/archiver/updater.py b/swh/storage/archiver/updater.py
deleted file mode 100644
index 64cf1cea..00000000
--- a/swh/storage/archiver/updater.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright (C) 2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import logging
-
-from swh.journal.client import SWHJournalClient
-
-from .storage import get_archiver_storage
-
-
-class SWHArchiverContentUpdater(SWHJournalClient):
- """Client in charge of updating new contents in the content_archiver
- db.
-
- This is a swh.journal client only dealing with contents.
-
- """
- CONFIG_BASE_FILENAME = 'archiver/content_updater'
-
- ADDITIONAL_CONFIG = {
- 'archiver_storage': (
- 'dict', {
- 'cls': 'db',
- 'args': {
- 'dbconn': 'dbname=softwareheritage-archiver-dev '
- 'user=guest',
- }
- }),
- 'sources_present': ('list[str]', ['uffizi'])
- }
-
- def __init__(self):
- # Only interested in content here so override the configuration
- super().__init__(extra_configuration={'object_types': ['content']})
-
- self.sources_present = self.config['sources_present']
-
- self.archiver_storage = get_archiver_storage(
- **self.config['archiver_storage'])
-
- def process_objects(self, messages):
- self.archiver_storage.content_archive_add(
- (c[b'sha1'] for c in messages['content']),
- self.sources_present)
-
-
-if __name__ == '__main__':
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s %(process)d %(levelname)s %(message)s'
- )
-
- content_updater = SWHArchiverContentUpdater()
- content_updater.process()
diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py
deleted file mode 100644
index c94d6f15..00000000
--- a/swh/storage/archiver/worker.py
+++ /dev/null
@@ -1,429 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import abc
-import datetime
-import logging
-import random
-
-from collections import defaultdict
-from celery import group
-
-from swh.core import config, utils
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.model import hashutil
-from swh.scheduler.utils import get_task
-
-from .storage import get_archiver_storage
-from .copier import ArchiverCopier
-
-
-logger = logging.getLogger('archiver.worker')
-
-
-class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta):
- """Base archive worker.
-
- Inherit from this class and override:
- - ADDITIONAL_CONFIG: Some added configuration needed for the
- director to work
- - CONFIG_BASE_FILENAME: relative path to lookup for the
- configuration file
- - def need_archival(self, content_data): Determine if a content
- needs archival or not
- - def choose_backup_servers(self, present, missing): Choose
- which backup server to send copies to
-
- """
- DEFAULT_CONFIG = {
- 'archiver_storage': ('dict', {
- 'cls': 'db',
- 'args': {
- 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest',
- },
- }),
- 'storages': ('list[dict]',
- [
- {'host': 'uffizi',
- 'cls': 'pathslicing',
- 'args': {'root': '/tmp/softwareheritage/objects',
- 'slicing': '0:2/2:4/4:6'}},
- {'host': 'banco',
- 'cls': 'remote',
- 'args': {'base_url': 'http://banco:5003/'}}
- ])
- }
-
- ADDITIONAL_CONFIG = {}
-
- CONFIG_BASE_FILENAME = 'archiver/worker'
-
- objstorages = {}
-
- def __init__(self, batch):
- super().__init__()
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
- self.batch = batch
- self.archiver_db = get_archiver_storage(
- **self.config['archiver_storage'])
- self.objstorages = {
- storage['host']: get_objstorage(storage['cls'], storage['args'])
- for storage in self.config.get('storages', [])
- }
- self.set_objstorages = set(self.objstorages)
-
- def run(self):
- """Do the task expected from the archiver worker.
-
- Process the contents in self.batch, ensure that the elements
- still need an archival (using archiver db), and spawn copiers
- to copy files in each destination according to the
- archiver-worker's policy.
-
- """
- transfers = defaultdict(list)
- for obj_id in self.batch:
- # Get dict {'missing': [servers], 'present': [servers]}
- # for contents ignoring those who don't need archival.
- copies = self.compute_copies(self.set_objstorages, obj_id)
- if not copies: # could not happen if using .director module
- msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id)
- logger.warning(msg)
- continue
-
- if not self.need_archival(copies):
- continue
-
- present = copies.get('present', set())
- missing = copies.get('missing', set())
- if len(present) == 0:
- msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id)
- logger.critical(msg)
- continue
-
- # Choose servers to be used as srcs and dests.
- for src_dest in self.choose_backup_servers(present, missing):
- transfers[src_dest].append(obj_id)
-
- # Then run copiers for each of the required transfers.
- contents_copied = []
- for (src, dest), content_ids in transfers.items():
- contents_copied.extend(self.run_copier(src, dest, content_ids))
-
- # copy is done, eventually do something else with them
- self.copy_finished(contents_copied)
-
- def compute_copies(self, set_objstorages, content_id):
- """From a content_id, return present and missing copies.
-
- Args:
- objstorages (set): objstorage's id name
- content_id: the content concerned
-
- Returns:
- A dictionary with the following keys:
- - 'present': set of archives where the content is present
- - 'missing': set of archives where the content is missing
- - 'ongoing': ongoing copies: dict mapping the archive id
- with the time the copy supposedly started.
- """
- result = self.archiver_db.content_archive_get(content_id)
- if not result:
- return None
- _, present, ongoing = result
- set_present = set_objstorages & set(present)
- set_ongoing = set_objstorages & set(ongoing)
- set_missing = set_objstorages - set_present - set_ongoing
- return {
- 'present': set_present,
- 'missing': set_missing,
- 'ongoing': {archive: value
- for archive, value in ongoing.items()
- if archive in set_ongoing},
- }
-
- def run_copier(self, source, destination, content_ids):
- """Run a copier in order to archive the given contents.
-
- Upload the given contents from the source to the destination.
- If the process fails, the whole content is considered uncopied
- and remains 'ongoing', waiting to be rescheduled as there is a
- delay.
-
- Args:
- source (str): source storage's identifier
- destination (str): destination storage's identifier
- content_ids ([sha1]): list of content ids to archive.
-
- """
- # Check if there are any errors among the contents.
- content_status = self.get_contents_error(content_ids, source)
-
- # Iterates over the error detected.
- for content_id, real_status in content_status.items():
- # Remove them from the to-archive list,
- # as they cannot be retrieved correctly.
- content_ids.remove(content_id)
- # Update their status to reflect their real state.
- self.archiver_db.content_archive_update(
- content_id, archive_id=source, new_status=real_status)
-
- # Now perform the copy on the remaining contents
- ac = ArchiverCopier(
- source=self.objstorages[source],
- destination=self.objstorages[destination],
- content_ids=content_ids)
-
- if ac.run():
- # Once the archival complete, update the database.
- for content_id in content_ids:
- self.archiver_db.content_archive_update(
- content_id, archive_id=destination, new_status='present')
-
- return content_ids
- return []
-
- def copy_finished(self, content_ids):
- """Hook to notify the content_ids archive copy is finished.
- (This is not an abstract method as this is optional
- """
- pass
-
- def get_contents_error(self, content_ids, source_storage):
- """Indicates what is the error associated to a content when needed
-
- Check the given content on the given storage. If an error is detected,
- it will be reported through the returned dict.
-
- Args:
- content_ids ([sha1]): list of content ids to check
- source_storage (str): the source storage holding the
- contents to check.
-
- Returns:
- a dict that map {content_id -> error_status} for each content_id
- with an error. The `error_status` result may be 'missing' or
- 'corrupted'.
-
- """
- content_status = {}
- storage = self.objstorages[source_storage]
- for content_id in content_ids:
- try:
- storage.check(content_id)
- except Error:
- content_status[content_id] = 'corrupted'
- logger.error('%s corrupted!' % hashutil.hash_to_hex(
- content_id))
- except ObjNotFoundError:
- content_status[content_id] = 'missing'
- logger.error('%s missing!' % hashutil.hash_to_hex(content_id))
-
- return content_status
-
- @abc.abstractmethod
- def need_archival(self, content_data):
- """Indicate if the content needs to be archived.
-
- Args:
- content_data (dict): dict that contains two lists 'present' and
- 'missing' with copies id corresponding to this status.
-
- Returns:
- True if there is not enough copies, False otherwise.
-
- """
- pass
-
- @abc.abstractmethod
- def choose_backup_servers(self, present, missing):
- """Choose and yield the required amount of couple source/destination
-
- For each required copy, choose a unique destination server
- among the missing copies and a source server among the
- presents.
-
- Args:
- present: set of objstorage source name where the content
- is present
- missing: set of objstorage destination name where the
- content is missing
-
- Yields:
- tuple (source (str), destination (src)) for each required copy.
-
- """
- pass
-
-
-class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker):
- """ Do the required backups on a given batch of contents.
-
- Process the content of a content batch in order to do the needed backups on
- the slaves servers.
- """
-
- ADDITIONAL_CONFIG = {
- 'retention_policy': ('int', 2),
- 'archival_max_age': ('int', 3600),
- 'sources': ('list[str]', ['uffizi', 'banco']),
- }
-
- def __init__(self, batch):
- """ Constructor of the ArchiverWorker class.
-
- Args:
- batch: list of object's sha1 that potentially need archival.
- """
- super().__init__(batch)
- config = self.config
- self.retention_policy = config['retention_policy']
- self.archival_max_age = config['archival_max_age']
- self.sources = config['sources']
-
- if len(self.objstorages) < self.retention_policy:
- raise ValueError('Retention policy is too high for the number of '
- 'provided servers')
-
- def need_archival(self, content_data):
- """ Indicate if the content need to be archived.
-
- Args:
- content_data (dict): dict that contains two lists 'present' and
- 'missing' with copies id corresponding to this status.
- Returns: True if there is not enough copies, False otherwise.
- """
- nb_presents = len(content_data.get('present', []))
- for copy, mtime in content_data.get('ongoing', {}).items():
- if not self._is_archival_delay_elapsed(mtime):
- nb_presents += 1
- return nb_presents < self.retention_policy
-
- def _is_archival_delay_elapsed(self, start_time):
- """ Indicates if the archival delay is elapsed given the start_time
-
- Args:
- start_time (float): time at which the archival started.
-
- Returns:
- True if the archival delay is elasped, False otherwise
- """
- elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - start_time
- return elapsed > datetime.timedelta(seconds=self.archival_max_age)
-
- def choose_backup_servers(self, present, missing):
- """Choose and yield the required amount of couple source/destination
-
- For each required copy, choose a unique destination server
- among the missing copies and a source server among the
- presents.
-
- Each destination server is unique so after archival, the
- retention policy requirement will be fulfilled. However, the
- source server may be used multiple times.
-
- Args:
- present: set of objstorage source name where the content
- is present
- missing: set of objstorage destination name where the
- content is missing
-
- Yields:
- tuple (source, destination) for each required copy.
-
- """
- # Transform from set to list to allow random selections
- missing = list(missing)
- present = list(present)
- all_sources = [source for source in present if source in self.sources]
- nb_required = self.retention_policy - len(present)
- destinations = random.sample(missing, nb_required)
- sources = [random.choice(all_sources) for dest in destinations]
- yield from zip(sources, destinations)
-
-
-class ArchiverToBackendWorker(BaseArchiveWorker):
- """Worker that sends copies over from a source to another backend.
-
- Process the content of a content batch from source objstorage to
- destination objstorage.
-
- """
-
- CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
-
- ADDITIONAL_CONFIG = {
- 'next_task': (
- 'dict', {
- 'queue': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask',
- 'batch_size': 10,
- }
- )
- }
-
- def __init__(self, destination, batch):
- """Constructor of the ArchiverWorkerToBackend class.
-
- Args:
- destination: where to copy the objects from
- batch: sha1s to send to destination
-
- """
- super().__init__(batch)
- self.destination = destination
- next_task = self.config['next_task']
- if next_task:
- destination_queue = next_task['queue']
- self.task_destination = get_task(destination_queue)
- self.batch_size = int(next_task['batch_size'])
- else:
- self.task_destination = self.batch_size = None
-
- def need_archival(self, content_data):
- """Indicate if the content needs to be archived.
-
- Args:
- content_data (dict): dict that contains 3 lists 'present',
- 'ongoing' and 'missing' with copies id corresponding to
- this status.
-
- Returns:
- True if we need to archive, False otherwise
-
- """
- return self.destination in content_data.get('missing', {})
-
- def choose_backup_servers(self, present, missing):
- """The destination is fixed to the destination mentioned.
-
- The only variable here is the source of information that we
- choose randomly in 'present'.
-
- Args:
- present: set of objstorage source name where the content
- is present
- missing: set of objstorage destination name where the
- content is missing
-
- Yields:
- tuple (source, destination) for each required copy.
-
- """
- yield (random.choice(list(present)), self.destination)
-
- def copy_finished(self, content_ids):
- """Once the copy is finished, we'll send those batch of contents as
- done in the destination queue.
-
- """
- if self.task_destination:
- groups = []
- for ids in utils.grouper(content_ids, self.batch_size):
- sig_ids = self.task_destination.s(list(ids))
- groups.append(sig_ids)
-
- group(groups).delay()
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
deleted file mode 100644
index 036d2121..00000000
--- a/swh/storage/tests/test_archiver.py
+++ /dev/null
@@ -1,486 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-import glob
-import tempfile
-import shutil
-import unittest
-import os
-
-from nose.tools import istest
-from nose.plugins.attrib import attr
-
-from swh.core.tests.db_testing import DbsTestFixture
-
-from swh.storage.archiver.storage import get_archiver_storage
-
-from swh.storage.archiver import ArchiverWithRetentionPolicyDirector
-from swh.storage.archiver import ArchiverWithRetentionPolicyWorker
-from swh.storage.archiver.db import utcnow
-
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
-
-try:
- # objstorage > 0.17
- from swh.objstorage.api.server import make_app as app
- from server_testing import ServerTestFixtureAsync as ServerTestFixture
- MIGRATED = True
-except ImportError:
- # objstorage <= 0.17
- from swh.objstorage.api.server import app
- from server_testing import ServerTestFixture
- MIGRATED = False
-
-TEST_DIR = os.path.dirname(os.path.abspath(__file__))
-TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
-
-
-@attr('db')
-class TestArchiver(DbsTestFixture, ServerTestFixture,
- unittest.TestCase):
- """ Test the objstorage archiver.
- """
-
- TEST_DB_NAMES = [
- 'softwareheritage-archiver-test',
- ]
- TEST_DB_DUMPS = [
- os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'),
- ]
- TEST_DB_DUMP_TYPES = [
- 'pg_dump',
- ]
-
- def setUp(self):
- # Launch the backup server
- self.dest_root = tempfile.mkdtemp(prefix='remote')
- self.config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.dest_root,
- 'slicing': '0:2/2:4/4:6',
- }
- }
- if MIGRATED:
- self.app = app(self.config)
- else:
- self.app = app
- super().setUp()
-
- # Retrieve connection (depends on the order in TEST_DB_NAMES)
- self.conn = self.conns[0] # archiver db's connection
- self.cursor = self.cursors[0]
-
- # Create source storage
- self.src_root = tempfile.mkdtemp()
- src_config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.src_root,
- 'slicing': '0:2/2:4/4:6'
- }
- }
- self.src_storage = get_objstorage(**src_config)
-
- # Create destination storage
- dest_config = {
- 'cls': 'remote',
- 'args': {
- 'url': self.url()
- }
- }
- self.dest_storage = get_objstorage(**dest_config)
-
- # Keep mapped the id to the storages
- self.storages = {
- 'uffizi': self.src_storage,
- 'banco': self.dest_storage
- }
-
- # Override configurations
- src_archiver_conf = {'host': 'uffizi'}
- dest_archiver_conf = {'host': 'banco'}
- src_archiver_conf.update(src_config)
- dest_archiver_conf.update(dest_config)
- self.archiver_storages = [src_archiver_conf, dest_archiver_conf]
- self._override_director_config()
- self._override_worker_config()
- # Create the base archiver
- self.archiver = self._create_director()
-
- def tearDown(self):
- self.empty_tables()
- shutil.rmtree(self.src_root)
- shutil.rmtree(self.dest_root)
- super().tearDown()
-
- def empty_tables(self):
- # Remove all content
- self.cursor.execute('DELETE FROM content')
- self.cursor.execute('DELETE FROM content_copies')
- self.conn.commit()
-
- def _override_director_config(self, retention_policy=2):
- """ Override the default config of the Archiver director
- to allow the tests to use the *-test db instead of the default one as
- there is no configuration file for now.
- """
- ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa
- 'archiver_storage': {
- 'cls': 'db',
- 'args': {
- 'dbconn': self.conn,
- },
- },
- 'batch_max_size': 5000,
- 'archival_max_age': 3600,
- 'retention_policy': retention_policy,
- 'asynchronous': False,
- 'max_queue_length': 100000,
- 'queue_throttling_delay': 120,
- }
-
- def _override_worker_config(self):
- """ Override the default config of the Archiver worker
- to allow the tests to use the *-test db instead of the default one as
- there is no configuration file for now.
- """
- ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa
- 'retention_policy': 2,
- 'archival_max_age': 3600,
- 'archiver_storage': {
- 'cls': 'db',
- 'args': {
- 'dbconn': self.conn,
- },
- },
- 'storages': self.archiver_storages,
- 'source': 'uffizi',
- 'sources': ['uffizi'],
- }
-
- def _create_director(self):
- return ArchiverWithRetentionPolicyDirector(start_id=None)
-
- def _create_worker(self, batch={}):
- return ArchiverWithRetentionPolicyWorker(batch)
-
- def _add_content(self, storage_name, content_data):
- """ Add really a content to the given objstorage
-
- This put an empty status for the added content.
-
- Args:
- storage_name: the concerned storage
- content_data: the data to insert
- with_row_insert: to insert a row entry in the db or not
-
- """
- # Add the content to the storage
- obj_id = self.storages[storage_name].add(content_data)
- self.cursor.execute(""" INSERT INTO content (sha1)
- VALUES (%s)
- """, (obj_id,))
- return obj_id
-
- def _update_status(self, obj_id, storage_name, status, date=None):
- """ Update the db status for the given id/storage_name.
-
- This does not create the content in the storage.
- """
- self.cursor.execute("""insert into archive (name)
- values (%s)
- on conflict do nothing""", (storage_name,))
-
- self.archiver.archiver_storage.content_archive_update(
- obj_id, storage_name, status
- )
-
- # Integration test
- @istest
- def archive_missing_content(self):
- """ Run archiver on a missing content should archive it.
- """
- obj_data = b'archive_missing_content'
- obj_id = self._add_content('uffizi', obj_data)
- self._update_status(obj_id, 'uffizi', 'present')
- # Content is missing on banco (entry not present in the db)
- try:
- self.dest_storage.get(obj_id)
- except ObjNotFoundError:
- pass
- else:
- self.fail('Content should not be present before archival')
- self.archiver.run()
- # now the content should be present on remote objstorage
- remote_data = self.dest_storage.get(obj_id)
- self.assertEquals(obj_data, remote_data)
-
- @istest
- def archive_present_content(self):
- """ A content that is not 'missing' shouldn't be archived.
- """
- obj_id = self._add_content('uffizi', b'archive_present_content')
- self._update_status(obj_id, 'uffizi', 'present')
- self._update_status(obj_id, 'banco', 'present')
- # After the run, the content should NOT be in the archive.
- # As the archiver believe it was already in.
- self.archiver.run()
- with self.assertRaises(ObjNotFoundError):
- self.dest_storage.get(obj_id)
-
- @istest
- def archive_already_enough(self):
- """ A content missing with enough copies shouldn't be archived.
- """
- obj_id = self._add_content('uffizi', b'archive_alread_enough')
- self._update_status(obj_id, 'uffizi', 'present')
- self._override_director_config(retention_policy=1)
- director = self._create_director()
- # Obj is present in only one archive but only one copy is required.
- director.run()
- with self.assertRaises(ObjNotFoundError):
- self.dest_storage.get(obj_id)
-
- @istest
- def content_archive_get_copies(self):
- self.assertCountEqual(
- self.archiver.archiver_storage.content_archive_get_copies(),
- [],
- )
- obj_id = self._add_content('uffizi', b'archive_alread_enough')
- self._update_status(obj_id, 'uffizi', 'present')
- self.assertCountEqual(
- self.archiver.archiver_storage.content_archive_get_copies(),
- [(obj_id, ['uffizi'], {})],
- )
-
- # Unit tests for archive worker
-
- def archival_elapsed(self, mtime):
- return self._create_worker()._is_archival_delay_elapsed(mtime)
-
- @istest
- def vstatus_ongoing_remaining(self):
- self.assertFalse(self.archival_elapsed(utcnow()))
-
- @istest
- def vstatus_ongoing_elapsed(self):
- past_time = (utcnow()
- - datetime.timedelta(
- seconds=self._create_worker().archival_max_age))
- self.assertTrue(self.archival_elapsed(past_time))
-
- @istest
- def need_archival_missing(self):
- """ A content should need archival when it is missing.
- """
- status_copies = {'present': ['uffizi'], 'missing': ['banco']}
- worker = self._create_worker()
- self.assertEqual(worker.need_archival(status_copies),
- True)
-
- @istest
- def need_archival_present(self):
- """ A content present everywhere shouldn't need archival
- """
- status_copies = {'present': ['uffizi', 'banco']}
- worker = self._create_worker()
- self.assertEqual(worker.need_archival(status_copies),
- False)
-
- def _compute_copies_status(self, status):
- """ A content with a given status should be detected correctly
- """
- obj_id = self._add_content(
- 'banco', b'compute_copies_' + bytes(status, 'utf8'))
- self._update_status(obj_id, 'banco', status)
- worker = self._create_worker()
- self.assertIn('banco', worker.compute_copies(
- set(worker.objstorages), obj_id)[status])
-
- @istest
- def compute_copies_present(self):
- """ A present content should be detected with correct status
- """
- self._compute_copies_status('present')
-
- @istest
- def compute_copies_missing(self):
- """ A missing content should be detected with correct status
- """
- self._compute_copies_status('missing')
-
- @istest
- def compute_copies_extra_archive(self):
- obj_id = self._add_content('banco', b'foobar')
- self._update_status(obj_id, 'banco', 'present')
- self._update_status(obj_id, 'random_archive', 'present')
- worker = self._create_worker()
- copies = worker.compute_copies(set(worker.objstorages), obj_id)
- self.assertEqual(copies['present'], {'banco'})
- self.assertEqual(copies['missing'], {'uffizi'})
-
- def _get_backups(self, present, missing):
- """ Return a list of the pair src/dest from the present and missing
- """
- worker = self._create_worker()
- return list(worker.choose_backup_servers(present, missing))
-
- @istest
- def choose_backup_servers(self):
- self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0)
- self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1)
- # Even with more possible destinations, do not take more than the
- # retention_policy require
- self.assertEqual(
- len(self._get_backups(['uffizi'], ['banco', 's3'])),
- 1
- )
-
-
-class TestArchiverStorageStub(unittest.TestCase):
- def setUp(self):
- self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local')
- self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote')
- self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log')
-
- src_config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.src_root,
- 'slicing': '0:2/2:4/4:6'
- }
- }
- self.src_storage = get_objstorage(**src_config)
-
- # Create destination storage
- dest_config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.dest_root,
- 'slicing': '0:2/2:4/4:6'
- }
- }
- self.dest_storage = get_objstorage(**dest_config)
-
- self.config = {
- 'cls': 'stub',
- 'args': {
- 'archives': {
- 'present_archive': 'http://uffizi:5003',
- 'missing_archive': 'http://banco:5003',
- },
- 'present': ['present_archive'],
- 'missing': ['missing_archive'],
- 'logfile_base': os.path.join(self.log_root, 'log_'),
- }
- }
-
- # Generated with:
- #
- # id_length = 20
- # random.getrandbits(8 * id_length).to_bytes(id_length, 'big')
- #
- self.content_ids = [
- b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f",
- b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d',
- b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83',
- b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2',
- b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc',
- ]
-
- self.archiver_storage = get_archiver_storage(**self.config)
- super().setUp()
-
- def tearDown(self):
- shutil.rmtree(self.src_root)
- shutil.rmtree(self.dest_root)
- shutil.rmtree(self.log_root)
- super().tearDown()
-
- @istest
- def archive_ls(self):
- self.assertCountEqual(
- self.archiver_storage.archive_ls(),
- self.config['args']['archives'].items()
- )
-
- @istest
- def content_archive_get(self):
- for content_id in self.content_ids:
- self.assertEqual(
- self.archiver_storage.content_archive_get(content_id),
- (content_id, set(self.config['args']['present']), {}),
- )
-
- @istest
- def content_archive_get_copies(self):
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_copies(),
- [],
- )
-
- @istest
- def content_archive_get_unarchived_copies(self):
- retention_policy = 2
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_unarchived_copies(
- retention_policy),
- [],
- )
-
- @istest
- def content_archive_get_missing(self):
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_missing(
- self.content_ids,
- 'missing_archive'
- ),
- self.content_ids,
- )
-
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_missing(
- self.content_ids,
- 'present_archive'
- ),
- [],
- )
-
- with self.assertRaises(ValueError):
- list(self.archiver_storage.content_archive_get_missing(
- self.content_ids,
- 'unknown_archive'
- ))
-
- @istest
- def content_archive_get_unknown(self):
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_unknown(
- self.content_ids,
- ),
- [],
- )
-
- @istest
- def content_archive_update(self):
- for content_id in self.content_ids:
- self.archiver_storage.content_archive_update(
- content_id, 'present_archive', 'present')
- self.archiver_storage.content_archive_update(
- content_id, 'missing_archive', 'present')
-
- self.archiver_storage.close_logfile()
-
- # Make sure we created a logfile
- files = glob.glob('%s*' % self.config['args']['logfile_base'])
- self.assertEqual(len(files), 1)
-
- # make sure the logfile contains all our lines
- lines = open(files[0]).readlines()
- self.assertEqual(len(lines), 2 * len(self.content_ids))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:07 PM (2 w, 57 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3255572
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment