Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/debian/control b/debian/control
index 18b99f50..d3dfcc1f 100644
--- a/debian/control
+++ b/debian/control
@@ -1,57 +1,48 @@
Source: swh-storage
Maintainer: Software Heritage developers <swh-devel@inria.fr>
Section: python
Priority: optional
Build-Depends: debhelper (>= 9),
dh-python,
python3-all,
python3-click,
python3-dateutil,
python3-flask,
python3-nose,
python3-psycopg2,
python3-requests,
python3-setuptools,
python3-swh.core (>= 0.0.28~),
python3-swh.model (>= 0.0.15~),
python3-swh.objstorage (>= 0.0.17~),
python3-swh.scheduler (>= 0.0.14~),
python3-aiohttp,
python3-vcversioner
Standards-Version: 3.9.6
Homepage: https://forge.softwareheritage.org/diffusion/DSTO/
Package: python3-swh.storage
Architecture: all
Depends: python3-swh.core (>= 0.0.28~),
python3-swh.model (>= 0.0.15~),
python3-swh.objstorage (>= 0.0.17~),
${misc:Depends},
${python3:Depends}
Description: Software Heritage storage utilities
Package: python3-swh.storage.listener
Architecture: all
Depends: python3-swh.journal (>= 0.0.2~),
python3-kafka (>= 1.3.1~),
python3-swh.storage (= ${binary:Version}),
${misc:Depends},
${python3:Depends}
Description: Software Heritage storage listener
-Package: python3-swh.storage.archiver
-Architecture: all
-Depends: python3-swh.scheduler (>= 0.0.14~),
- python3-swh.journal,
- python3-swh.storage (= ${binary:Version}),
- ${misc:Depends},
- ${python3:Depends}
-Description: Software Heritage storage Archiver
-
Package: python3-swh.storage.provenance
Architecture: all
Depends: python3-swh.scheduler (>= 0.0.14~),
python3-swh.storage (= ${binary:Version}),
${misc:Depends},
${python3:Depends}
Description: Software Heritage storage Provenance
diff --git a/debian/rules b/debian/rules
index 5dbaab1b..cf9189bd 100755
--- a/debian/rules
+++ b/debian/rules
@@ -1,26 +1,23 @@
#!/usr/bin/make -f
export PYBUILD_NAME=swh.storage
%:
dh $@ --with python3 --buildsystem=pybuild
override_dh_install:
dh_install
for pyvers in $(shell py3versions -vr); do \
mkdir -p $(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \
mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/listener.py \
$(CURDIR)/debian/python3-swh.storage.listener/usr/lib/python$$pyvers/dist-packages/swh/storage/ ; \
- mkdir -p $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver ; \
- mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/* \
- $(CURDIR)/debian/python3-swh.storage.archiver/usr/lib/python$$pyvers/dist-packages/swh/storage/archiver/ ; \
mkdir -p $(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance ; \
mv $(CURDIR)/debian/python3-swh.storage/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/* \
$(CURDIR)/debian/python3-swh.storage.provenance/usr/lib/python$$pyvers/dist-packages/swh/storage/provenance/ ; \
done
override_dh_auto_test:
PYBUILD_SYSTEM=custom \
PYBUILD_TEST_ARGS="cd {build_dir}; python{version} -m nose swh -sva '!db'" \
dh_auto_test
diff --git a/sql/archiver/Makefile b/sql/archiver/Makefile
deleted file mode 100644
index c132dbcc..00000000
--- a/sql/archiver/Makefile
+++ /dev/null
@@ -1,42 +0,0 @@
-# Depends: postgresql-client, postgresql-autodoc
-
-DBNAME = softwareheritage-archiver-dev
-DOCDIR = autodoc
-
-SQL_INIT = ../swh-init.sql
-SQL_SCHEMA = swh-archiver-schema.sql
-SQL_FUNC = swh-archiver-func.sql
-SQL_DATA = swh-archiver-data.sql
-SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA)
-
-PSQL_BIN = psql
-PSQL_FLAGS = --single-transaction --echo-all -X
-PSQL = $(PSQL_BIN) $(PSQL_FLAGS)
-
-
-all:
-
-createdb: createdb-stamp
-createdb-stamp: $(SQL_INIT)
- createdb $(DBNAME)
- touch $@
-
-filldb: filldb-stamp
-filldb-stamp: createdb-stamp
- cat $(SQLS) | $(PSQL) $(DBNAME)
- touch $@
-
-dropdb:
- -dropdb $(DBNAME)
-
-dumpdb: swh-archiver.dump
-swh.dump: filldb-stamp
- pg_dump -Fc $(DBNAME) > $@
-
-clean:
- rm -rf *-stamp $(DOCDIR)/
-
-distclean: clean dropdb
- rm -f swh.dump
-
-.PHONY: all initdb createdb dropdb doc clean
diff --git a/sql/archiver/swh-archiver-data.sql b/sql/archiver/swh-archiver-data.sql
deleted file mode 100644
index e4a70a25..00000000
--- a/sql/archiver/swh-archiver-data.sql
+++ /dev/null
@@ -1,3 +0,0 @@
-INSERT INTO archive(name) VALUES('uffizi');
-INSERT INTO archive(name) VALUES('banco');
-INSERT INTO archive(name) VALUES('azure');
diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql
deleted file mode 100644
index 750bfae2..00000000
--- a/sql/archiver/swh-archiver-func.sql
+++ /dev/null
@@ -1,40 +0,0 @@
-create or replace function swh_mktemp_content()
- returns void
- language plpgsql
-as $$
- begin
- create temporary table tmp_content (
- sha1 sha1 not null
- ) on commit drop;
- return;
- end
-$$;
-
-create or replace function swh_content_copies_from_temp(archive_names text[])
- returns void
- language plpgsql
-as $$
- begin
- with existing_content_ids as (
- select id
- from content
- inner join tmp_content on content.sha1 = tmp.sha1
- ), created_content_ids as (
- insert into content (sha1)
- select sha1 from tmp_content
- on conflict do nothing
- returning id
- ), content_ids as (
- select * from existing_content_ids
- union all
- select * from created_content_ids
- ), archive_ids as (
- select id from archive
- where name = any(archive_names)
- ) insert into content_copies (content_id, archive_id, mtime, status)
- select content_ids.id, archive_ids.id, now(), 'present'
- from content_ids cross join archive_ids
- on conflict (content_id, archive_id) do update
- set mtime = excluded.mtime, status = excluded.status;
- end
-$$;
diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql
deleted file mode 100644
index e5849585..00000000
--- a/sql/archiver/swh-archiver-schema.sql
+++ /dev/null
@@ -1,63 +0,0 @@
--- In order to archive the content of the object storage, add
--- some tables to keep trace of what have already been archived.
-
-create table dbversion
-(
- version int primary key,
- release timestamptz,
- description text
-);
-
-comment on table dbversion is 'Schema update tracking';
-
-INSERT INTO dbversion(version, release, description)
-VALUES(10, now(), 'Work In Progress');
-
-CREATE TABLE archive (
- id bigserial PRIMARY KEY,
- name text not null
-);
-
-create unique index on archive(name);
-
-comment on table archive is 'The archives in which contents are stored';
-comment on column archive.id is 'Short identifier for archives';
-comment on column archive.name is 'Name of the archive';
-
-CREATE TYPE archive_status AS ENUM (
- 'missing',
- 'ongoing',
- 'present',
- 'corrupted'
-);
-
-comment on type archive_status is 'Status of a given copy of a content';
-
--- a SHA1 checksum (not necessarily originating from Git)
-CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20);
-
--- a bucket for which we count items
-CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2);
-
-create table content (
- id bigserial primary key,
- sha1 sha1 not null
-);
-
-comment on table content is 'All the contents being archived by Software Heritage';
-comment on column content.id is 'Short id for the content being archived';
-comment on column content.sha1 is 'SHA1 hash of the content being archived';
-
-create unique index on content(sha1);
-
-create table content_copies (
- content_id bigint not null, -- references content(id)
- archive_id bigint not null, -- references archive(id)
- mtime timestamptz,
- status archive_status not null,
- primary key (content_id, archive_id)
-);
-
-comment on table content_copies is 'Tracking of all content copies in the archives';
-comment on column content_copies.mtime is 'Last update time of the copy';
-comment on column content_copies.status is 'Status of the copy';
diff --git a/sql/archiver/upgrades/002.sql b/sql/archiver/upgrades/002.sql
deleted file mode 100644
index d83db028..00000000
--- a/sql/archiver/upgrades/002.sql
+++ /dev/null
@@ -1,9 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 1
--- to_version: 2
--- description: Add a 'corrupted' status into the archive_content status
-
-INSERT INTO dbversion(version, release, description)
-VALUES(2, now(), 'Work In Progress');
-
-ALTER TYPE archive_status ADD VALUE 'corrupted';
diff --git a/sql/archiver/upgrades/003.sql b/sql/archiver/upgrades/003.sql
deleted file mode 100644
index ba43f526..00000000
--- a/sql/archiver/upgrades/003.sql
+++ /dev/null
@@ -1,25 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 2
--- to_version: 3
--- description: Add a 'num_present' cache column into the archive_content status
-
-INSERT INTO dbversion(version, release, description)
-VALUES(3, now(), 'Work In Progress');
-
-alter table content_archive add column num_present int default null;
-comment on column content_archive.num_present is 'Number of copies marked as present (cache updated via trigger)';
-
-create index concurrently on content_archive(num_present);
-
--- Keep the num_copies cache updated
-CREATE FUNCTION update_num_present() RETURNS TRIGGER AS $$
- BEGIN
- NEW.num_present := (select count(*) from jsonb_each(NEW.copies) where value->>'status' = 'present');
- RETURN new;
- END;
-$$ LANGUAGE PLPGSQL;
-
-CREATE TRIGGER update_num_present
- BEFORE INSERT OR UPDATE OF copies ON content_archive
- FOR EACH ROW
- EXECUTE PROCEDURE update_num_present();
diff --git a/sql/archiver/upgrades/004.sql b/sql/archiver/upgrades/004.sql
deleted file mode 100644
index bfb5ad31..00000000
--- a/sql/archiver/upgrades/004.sql
+++ /dev/null
@@ -1,44 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 3
--- to_version: 4
--- description: Add azure instance
-
-INSERT INTO dbversion(version, release, description)
-VALUES(4, now(), 'Work In Progress');
-
-ALTER TABLE archive DROP COLUMN url;
-ALTER TABLE archive ALTER COLUMN id SET DATA TYPE TEXT;
-
-INSERT INTO archive(id) VALUES ('azure');
-
-create or replace function swh_mktemp_content_archive()
- returns void
- language sql
-as $$
- create temporary table tmp_content_archive (
- like content_archive including defaults
- ) on commit drop;
- alter table tmp_content_archive drop column copies;
- alter table tmp_content_archive drop column num_present;
-$$;
-
-COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive';
-
-create or replace function swh_content_archive_missing(backend_name text)
- returns setof sha1
- language plpgsql
-as $$
-begin
- return query
- select content_id
- from tmp_content_archive tmp where exists (
- select 1
- from content_archive c
- where tmp.content_id = c.content_id
- and (not c.copies ? backend_name
- or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb))
- );
-end
-$$;
-
-COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend';
diff --git a/sql/archiver/upgrades/005.sql b/sql/archiver/upgrades/005.sql
deleted file mode 100644
index bc50631c..00000000
--- a/sql/archiver/upgrades/005.sql
+++ /dev/null
@@ -1,24 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 4
--- to_version: 5
--- description: List unknown sha1s from content_archive
-
-INSERT INTO dbversion(version, release, description)
-VALUES(5, now(), 'Work In Progress');
-
-create or replace function swh_content_archive_unknown()
- returns setof sha1
- language plpgsql
-as $$
-begin
- return query
- select content_id
- from tmp_content_archive tmp where not exists (
- select 1
- from content_archive c
- where tmp.content_id = c.content_id
- );
-end
-$$;
-
-COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1';
diff --git a/sql/archiver/upgrades/006.sql b/sql/archiver/upgrades/006.sql
deleted file mode 100644
index d9d1b24c..00000000
--- a/sql/archiver/upgrades/006.sql
+++ /dev/null
@@ -1,100 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 5
--- to_version: 6
--- description: Create a bucketed count of contents in the archive.
-
-INSERT INTO dbversion(version, release, description)
-VALUES(6, now(), 'Work In Progress');
-
--- a bucket for which we count items
-CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2);
-
-CREATE TABLE content_archive_counts (
- archive text not null references archive(id),
- bucket bucket not null,
- count bigint,
- primary key (archive, bucket)
-);
-
-comment on table content_archive_counts is 'Bucketed count of archive contents';
-comment on column content_archive_counts.archive is 'the archive for which we''re counting';
-comment on column content_archive_counts.bucket is 'the bucket of items we''re counting';
-comment on column content_archive_counts.count is 'the number of items counted in the given bucket';
-
-
-CREATE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$
- with sample as (
- select content_id, copies from content_archive
- where content_id > from_id and content_id <= to_id
- ), data as (
- select substring(content_id from 19) as bucket, jbe.key as archive
- from sample
- join lateral jsonb_each(copies) jbe on true
- where jbe.value->>'status' = 'present'
- ), bucketed as (
- select bucket, archive, count(*) as count
- from data
- group by bucket, archive
- ) update content_archive_counts cac set
- count = cac.count + bucketed.count
- from bucketed
- where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket;
-$$;
-
-comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts';
-
-CREATE FUNCTION init_content_archive_counts() returns void language sql as $$
- insert into content_archive_counts (
- select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count
- from archive join lateral generate_series(0, 65535) bucket on true
- ) on conflict (archive, bucket) do nothing;
-$$;
-
-comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives';
-
--- keep the content_archive_counts updated
-CREATE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
- DECLARE
- content_id sha1;
- content_bucket bucket;
- copies record;
- old_row content_archive;
- new_row content_archive;
- BEGIN
- -- default values for old or new row depending on trigger type
- if tg_op = 'INSERT' then
- old_row := (null::sha1, '{}'::jsonb, 0);
- else
- old_row := old;
- end if;
- if tg_op = 'DELETE' then
- new_row := (null::sha1, '{}'::jsonb, 0);
- else
- new_row := new;
- end if;
-
- -- get the content bucket
- content_id := coalesce(old_row.content_id, new_row.content_id);
- content_bucket := substring(content_id from 19)::bucket;
-
- -- compare copies present in old and new row for each archive type
- FOR copies IN
- select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status
- from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key
- LOOP
- -- the count didn't change
- CONTINUE WHEN copies.old_status is distinct from copies.new_status OR
- (copies.old_status != 'present' AND copies.new_status != 'present');
-
- update content_archive_counts cac
- set count = count + (case when copies.old_status = 'present' then -1 else 1 end)
- where archive = copies.archive and bucket = content_bucket;
- END LOOP;
- return null;
- END;
-$$;
-
-create trigger update_content_archive_counts
- AFTER INSERT OR UPDATE OR DELETE ON content_archive
- FOR EACH ROW
- EXECUTE PROCEDURE update_content_archive_counts();
diff --git a/sql/archiver/upgrades/007.sql b/sql/archiver/upgrades/007.sql
deleted file mode 100644
index 34049426..00000000
--- a/sql/archiver/upgrades/007.sql
+++ /dev/null
@@ -1,21 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 6
--- to_version: 7
--- description: Add a function to compute archive counts
-
-INSERT INTO dbversion(version, release, description)
-VALUES(7, now(), 'Work In Progress');
-
-create type content_archive_count as (
- archive text,
- count bigint
-);
-
-create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$
- select archive, sum(count)::bigint
- from content_archive_counts
- group by archive
- order by archive;
-$$;
-
-comment on function get_content_archive_counts() is 'Get count for each archive';
diff --git a/sql/archiver/upgrades/008.sql b/sql/archiver/upgrades/008.sql
deleted file mode 100644
index 6527aca6..00000000
--- a/sql/archiver/upgrades/008.sql
+++ /dev/null
@@ -1,49 +0,0 @@
--- SWH DB schema upgrade
--- from_version: 7
--- to_version: 8
--- description: Fix silly bug in update_content_archive_counts
-
-INSERT INTO dbversion(version, release, description)
-VALUES(8, now(), 'Work In Progress');
-
--- keep the content_archive_counts updated
-CREATE OR REPLACE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
- DECLARE
- content_id sha1;
- content_bucket bucket;
- copies record;
- old_row content_archive;
- new_row content_archive;
- BEGIN
- -- default values for old or new row depending on trigger type
- if tg_op = 'INSERT' then
- old_row := (null::sha1, '{}'::jsonb, 0);
- else
- old_row := old;
- end if;
- if tg_op = 'DELETE' then
- new_row := (null::sha1, '{}'::jsonb, 0);
- else
- new_row := new;
- end if;
-
- -- get the content bucket
- content_id := coalesce(old_row.content_id, new_row.content_id);
- content_bucket := substring(content_id from 19)::bucket;
-
- -- compare copies present in old and new row for each archive type
- FOR copies IN
- select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status
- from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key
- LOOP
- -- the count didn't change
- CONTINUE WHEN copies.old_status is not distinct from copies.new_status OR
- (copies.old_status != 'present' AND copies.new_status != 'present');
-
- update content_archive_counts cac
- set count = count + (case when copies.old_status = 'present' then -1 else 1 end)
- where archive = copies.archive and bucket = content_bucket;
- END LOOP;
- return null;
- END;
-$$;
diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql
deleted file mode 100644
index 5a3133ba..00000000
--- a/sql/archiver/upgrades/009.sql
+++ /dev/null
@@ -1,42 +0,0 @@
--- SWH Archiver DB schema upgrade
--- from_version: 8
--- to_version: 9
--- description: Add helper functions to create temporary table and insert new entries in content_archive table
-
-insert into dbversion(version, release, description)
-values(9, now(), 'Work In Progress');
-
--- create a temporary table called tmp_TBLNAME, mimicking existing
--- table TBLNAME
-create or replace function swh_mktemp(tblname regclass)
- returns void
- language plpgsql
-as $$
-begin
- execute format('
- create temporary table tmp_%1$I
- (like %1$I including defaults)
- on commit drop;
- ', tblname);
- return;
-end
-$$;
-
-comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one';
-
--- Helper function to insert new entries in content_archive from a
--- temporary table skipping duplicates.
-create or replace function swh_content_archive_add()
- returns void
- language plpgsql
-as $$
-begin
- insert into content_archive (content_id, copies, num_present)
- select distinct content_id, copies, num_present
- from tmp_content_archive
- on conflict(content_id) do nothing;
- return;
-end
-$$;
-
-comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive';
diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py
deleted file mode 100644
index 2ff1cce1..00000000
--- a/swh/storage/archiver/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from .director import ArchiverWithRetentionPolicyDirector # NOQA
-from .director import ArchiverStdinToBackendDirector # NOQA
-from .worker import ArchiverWithRetentionPolicyWorker # NOQA
-from .worker import ArchiverToBackendWorker # NOQA
-from .copier import ArchiverCopier # NOQA
diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py
deleted file mode 100644
index 1832e2bd..00000000
--- a/swh/storage/archiver/copier.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-import logging
-
-from swh.objstorage.exc import ObjNotFoundError
-from swh.model import hashutil
-
-logger = logging.getLogger('archiver.worker.copier')
-
-
-class ArchiverCopier():
- """ This archiver copy some files into a remote objstorage
- in order to get a backup.
- """
- def __init__(self, source, destination, content_ids):
- """ Create a Copier for the archiver
-
- Args:
- source (ObjStorage): source storage to get the contents.
- destination (ObjStorage): Storage where the contents will
- be copied.
- content_ids: list of content's id to archive.
- """
- self.source = source
- self.destination = destination
- self.content_ids = content_ids
-
- def run(self):
- """ Do the copy on the backup storage.
-
- Run the archiver copier in order to copy the required content
- into the current destination.
- The content which corresponds to the sha1 in self.content_ids
- will be fetched from the master_storage and then copied into
- the backup object storage.
-
- Returns:
- A boolean that indicates if the whole content have been copied.
- """
- try:
- for content_id in self.content_ids:
- try:
- content = self.source.get(content_id)
- except ObjNotFoundError:
- logging.error('content %s not found' %
- hashutil.hash_to_hex(content_id))
- continue
- self.destination.add(content, content_id)
- except Exception as e:
- logger.exception('Problem during copy: %s' % e)
- return False
- return True
diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py
deleted file mode 100644
index d9e5e439..00000000
--- a/swh/storage/archiver/db.py
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-
-from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure
-
-
-def utcnow():
- return datetime.datetime.now(tz=datetime.timezone.utc)
-
-
-class ArchiverDb(BaseDb):
- """Proxy to the SWH's archiver DB
-
- """
-
- def archive_ls(self, cur=None):
- """ Get all the archives registered on the server.
-
- Yields:
- a tuple (server_id, server_url) for each archive server.
- """
- cur = self._cursor(cur)
- cur.execute("SELECT * FROM archive")
- yield from cursor_to_bytes(cur)
-
- def content_archive_get(self, content_id, cur=None):
- """ Get the archival status of a content in a specific server.
-
- Retrieve from the database the archival status of the given content
- in the given archive server.
-
- Args:
- content_id: the sha1 of the content.
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
- """
- query = """select archive.name, status, mtime
- from content_copies
- left join archive on content_copies.archive_id = archive.id
- where content_copies.content_id = (
- select id from content where sha1 = %s)
- """
- cur = self._cursor(cur)
- cur.execute(query, (content_id,))
- rows = cur.fetchall()
- if not rows:
- return None
- present = []
- ongoing = {}
- for archive, status, mtime in rows:
- if status == 'present':
- present.append(archive)
- elif status == 'ongoing':
- ongoing[archive] = mtime
- return (content_id, present, ongoing)
-
- def content_archive_get_copies(self, last_content=None, limit=1000,
- cur=None):
- """Get the list of copies for `limit` contents starting after
- `last_content`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
-
- vars = {
- 'limit': limit,
- }
-
- if last_content is None:
- last_content_clause = 'true'
- else:
- last_content_clause = """content_id > (
- select id from content
- where sha1 = %(last_content)s)"""
- vars['last_content'] = last_content
-
- query = """select
- (select sha1 from content where id = content_id),
- array_agg((select name from archive
- where id = archive_id))
- from content_copies
- where status = 'present' and %s
- group by content_id
- order by content_id
- limit %%(limit)s""" % last_content_clause
-
- cur = self._cursor(cur)
- cur.execute(query, vars)
- for content_id, present in cursor_to_bytes(cur):
- yield (content_id, present, {})
-
- def content_archive_get_unarchived_copies(
- self, retention_policy, last_content=None,
- limit=1000, cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`. Yields only copies with number of present
- smaller than `retention policy`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- retention_policy: number of required present copies
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
-
- vars = {
- 'limit': limit,
- 'retention_policy': retention_policy,
- }
-
- if last_content is None:
- last_content_clause = 'true'
- else:
- last_content_clause = """content_id > (
- select id from content
- where sha1 = %(last_content)s)"""
- vars['last_content'] = last_content
-
- query = """select
- (select sha1 from content where id = content_id),
- array_agg((select name from archive
- where id = archive_id))
- from content_copies
- where status = 'present' and %s
- group by content_id
- having count(archive_id) < %%(retention_policy)s
- order by content_id
- limit %%(limit)s""" % last_content_clause
-
- cur = self._cursor(cur)
- cur.execute(query, vars)
- for content_id, present in cursor_to_bytes(cur):
- yield (content_id, present, {})
-
- @stored_procedure('swh_mktemp_content_archive')
- def mktemp_content_archive(self, cur=None):
- """Trigger the creation of the temporary table tmp_content_archive
- during the lifetime of the transaction.
-
- """
- pass
-
- @stored_procedure('swh_content_archive_add')
- def content_archive_add_from_temp(self, cur=None):
- """Add new content archive entries from temporary table.
-
- Use from archiver.storage module:
- self.db.mktemp_content_archive()
- # copy data over to the temp table
- self.db.copy_to([{'colname': id0}, {'colname': id1}],
- 'tmp_cache_content',
- ['colname'], cur)
- # insert into the main table
- self.db.add_content_archive_from_temp(cur)
-
- """
- pass
-
- def content_archive_get_missing(self, backend_name, cur=None):
- """Retrieve the content missing from backend_name.
-
- """
- cur = self._cursor(cur)
- cur.execute("select * from swh_content_archive_missing(%s)",
- (backend_name,))
- yield from cursor_to_bytes(cur)
-
- def content_archive_get_unknown(self, cur=None):
- """Retrieve unknown sha1 from archiver db.
-
- """
- cur = self._cursor(cur)
- cur.execute('select * from swh_content_archive_unknown()')
- yield from cursor_to_bytes(cur)
-
- def content_archive_update(self, content_id, archive_id,
- new_status=None, cur=None):
- """ Update the status of an archive content and set its mtime to
-
- Change the mtime of an archived content for the given archive and set
- it's mtime to the current time.
-
- Args:
- content_id (str): content sha1
- archive_id (str): name of the archive
- new_status (str): one of 'missing', 'present' or 'ongoing'.
- this status will replace the previous one. If not given,
- the function only change the mtime of the content for the
- given archive.
- """
- assert isinstance(content_id, bytes)
- assert new_status is not None
-
- query = """insert into content_copies (archive_id, content_id, status, mtime)
- values ((select id from archive where name=%s),
- (select id from content where sha1=%s),
- %s, %s)
- on conflict (archive_id, content_id) do
- update set status = excluded.status, mtime = excluded.mtime
- """
-
- cur = self._cursor(cur)
- cur.execute(query, (archive_id, content_id, new_status, utcnow()))
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
deleted file mode 100644
index 20b36a36..00000000
--- a/swh/storage/archiver/director.py
+++ /dev/null
@@ -1,339 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import abc
-import logging
-import sys
-import time
-
-import click
-
-from swh.core import config, utils
-from swh.model import hashutil
-from swh.objstorage import get_objstorage
-from swh.scheduler.utils import get_task
-
-from . import tasks # noqa
-from .storage import get_archiver_storage
-
-
-class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta):
- """Abstract Director class
-
- An archiver director is in charge of dispatching batch of
- contents to archiver workers (for them to archive).
-
- Inherit from this class and provide:
- - ADDITIONAL_CONFIG: Some added configuration needed for the
- director to work
- - CONFIG_BASE_FILENAME: relative path to lookup for the
- configuration file
- - def get_contents_to_archive(self): Implementation method to read
- contents to archive
-
- """
- DEFAULT_CONFIG = {
- 'batch_max_size': ('int', 1500),
- 'asynchronous': ('bool', True),
- 'max_queue_length': ('int', 100000),
- 'queue_throttling_delay': ('int', 120),
-
- 'archiver_storage': ('dict', {
- 'cls': 'db',
- 'args': {
- 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest',
- },
- }),
- }
-
- # Destined to be overridden by subclass
- ADDITIONAL_CONFIG = {}
-
- # We use the same configuration file as the worker
- CONFIG_BASE_FILENAME = 'archiver/worker'
-
- # The worker's task queue name to use
- TASK_NAME = None
-
- def __init__(self):
- """ Constructor of the archiver director.
-
- Args:
- db_conn_archiver: Either a libpq connection string,
- or a psycopg2 connection for the archiver db.
- config: optionnal additional configuration. Keys in the dict will
- override the one parsed from the configuration file.
- """
- super().__init__()
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
- self.archiver_storage = get_archiver_storage(
- **self.config['archiver_storage'])
- self.task = get_task(self.TASK_NAME)
- self.max_queue_length = self.config['max_queue_length']
- self.throttling_delay = self.config['queue_throttling_delay']
-
- def run(self):
- """ Run the archiver director.
-
- The archiver director will check all the contents of the archiver
- database and do the required backup jobs.
- """
- if self.config['asynchronous']:
- run_fn = self.run_async_worker
- else:
- run_fn = self.run_sync_worker
-
- for batch in self.read_batch_contents():
- run_fn(batch)
-
- def run_async_worker(self, batch):
- """Produce a worker that will be added to the task queue.
-
- """
- max_length = self.max_queue_length
- throttling_delay = self.throttling_delay
-
- while True:
- length = self.task.app.get_queue_length(self.task.task_queue)
- if length >= max_length:
- logging.info(
- 'queue length %s >= %s, throttling for %s seconds' % (
- length,
- max_length,
- throttling_delay,
- )
- )
- time.sleep(throttling_delay)
- else:
- break
-
- self.task.delay(batch=batch)
-
- def run_sync_worker(self, batch):
- """Run synchronously a worker on the given batch.
-
- """
- self.task(batch=batch)
-
- def read_batch_contents(self):
- """ Create batch of contents that needs to be archived
-
- Yields:
- batch of sha1 that corresponds to contents that needs more archive
- copies.
- """
- contents = []
- for content in self.get_contents_to_archive():
- contents.append(content)
- if len(contents) > self.config['batch_max_size']:
- yield contents
- contents = []
- if len(contents) > 0:
- yield contents
-
- @abc.abstractmethod
- def get_contents_to_archive(self):
- """Retrieve generator of sha1 to archive
-
- Yields:
- sha1 to archive
-
- """
- pass
-
-
-class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase):
- """Process the files in order to know which one is needed as backup.
-
- The archiver director processes the files in the local storage in order
- to know which one needs archival and it delegates this task to
- archiver workers.
- """
-
- ADDITIONAL_CONFIG = {
- 'retention_policy': ('int', 2),
- }
-
- TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask'
-
- def __init__(self, start_id):
- super().__init__()
- if start_id is not None:
- self.start_id = hashutil.hash_to_bytes(start_id)
- else:
- self.start_id = None
-
- def get_contents_to_archive(self):
- """Create batch of contents that needs to be archived
-
- Yields:
- Datas about a content as a tuple
- (content_id, present_copies, ongoing_copies) where ongoing_copies
- is a dict mapping copy to mtime.
-
- """
- last_content = self.start_id
- while True:
- archiver_contents = list(
- self.archiver_storage.content_archive_get_unarchived_copies(
- last_content=last_content,
- retention_policy=self.config['retention_policy'],
- limit=self.config['batch_max_size']))
- if not archiver_contents:
- return
- for content_id, _, _ in archiver_contents:
- last_content = content_id
- yield content_id
-
-
-def read_sha1_from_stdin():
- """Read sha1 from stdin.
-
- """
- for line in sys.stdin:
- sha1 = line.strip()
- try:
- yield hashutil.hash_to_bytes(sha1)
- except Exception:
- print("%s is not a valid sha1 hash, continuing" % repr(sha1),
- file=sys.stderr)
- continue
-
-
-class ArchiverStdinToBackendDirector(ArchiverDirectorBase):
- """A cloud archiver director in charge of reading contents and send
- them in batch in the cloud.
-
- The archiver director, in order:
- - Reads sha1 to send to a specific backend.
- - Checks if those sha1 are known in the archiver. If they are not,
- add them
- - if the sha1 are missing, they are sent for the worker to archive
-
- If the flag force_copy is set, this will force the copy to be sent
- for archive even though it has already been done.
-
- """
- ADDITIONAL_CONFIG = {
- 'destination': ('str', 'azure'),
- 'force_copy': ('bool', False),
- 'source': ('str', 'uffizi'),
- 'storages': ('list[dict]',
- [
- {'host': 'uffizi',
- 'cls': 'pathslicing',
- 'args': {'root': '/tmp/softwareheritage/objects',
- 'slicing': '0:2/2:4/4:6'}},
- {'host': 'banco',
- 'cls': 'remote',
- 'args': {'base_url': 'http://banco:5003/'}}
- ])
- }
-
- CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
-
- TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask'
-
- def __init__(self):
- super().__init__()
- self.destination = self.config['destination']
- self.force_copy = self.config['force_copy']
- self.objstorages = {
- storage['host']: get_objstorage(storage['cls'], storage['args'])
- for storage in self.config.get('storages', [])
- }
- # Fallback objstorage
- self.source = self.config['source']
-
- def _add_unknown_content_ids(self, content_ids):
- """Check whether some content_id are unknown.
- If they are, add them to the archiver db.
-
- Args:
- content_ids: List of dict with one key content_id
-
- """
- source_objstorage = self.objstorages[self.source]
-
- self.archiver_storage.content_archive_add(
- (h
- for h in content_ids
- if h in source_objstorage),
- sources_present=[self.source])
-
- def get_contents_to_archive(self):
- gen_content_ids = (
- ids for ids in utils.grouper(read_sha1_from_stdin(),
- self.config['batch_max_size']))
-
- if self.force_copy:
- for content_ids in gen_content_ids:
- content_ids = list(content_ids)
-
- if not content_ids:
- continue
-
- # Add missing entries in archiver table
- self._add_unknown_content_ids(content_ids)
-
- print('Send %s contents to archive' % len(content_ids))
-
- for content_id in content_ids:
- # force its status to missing
- self.archiver_storage.content_archive_update(
- content_id, self.destination, 'missing')
- yield content_id
-
- else:
- for content_ids in gen_content_ids:
- content_ids = list(content_ids)
-
- # Add missing entries in archiver table
- self._add_unknown_content_ids(content_ids)
-
- # Filter already copied data
- content_ids = list(
- self.archiver_storage.content_archive_get_missing(
- content_ids=content_ids,
- backend_name=self.destination))
-
- if not content_ids:
- continue
-
- print('Send %s contents to archive' % len(content_ids))
-
- for content in content_ids:
- yield content
-
- def run_async_worker(self, batch):
- """Produce a worker that will be added to the task queue.
-
- """
- self.task.delay(destination=self.destination, batch=batch)
-
- def run_sync_worker(self, batch):
- """Run synchronously a worker on the given batch.
-
- """
- self.task(destination=self.destination, batch=batch)
-
-
-@click.command()
-@click.option('--direct', is_flag=True,
- help="""The archiver sends content for backup to
-one storage.""")
-@click.option('--start-id', default=None, help="The first id to process")
-def launch(direct, start_id):
- if direct:
- archiver = ArchiverStdinToBackendDirector()
- else:
- archiver = ArchiverWithRetentionPolicyDirector(start_id)
-
- archiver.run()
-
-
-if __name__ == '__main__':
- launch()
diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py
deleted file mode 100644
index 97710582..00000000
--- a/swh/storage/archiver/storage.py
+++ /dev/null
@@ -1,361 +0,0 @@
-# Copyright (C) 2016-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import json
-import os
-import psycopg2
-import time
-
-from .db import ArchiverDb
-
-from swh.model import hashutil
-from swh.storage.common import db_transaction_generator, db_transaction
-from swh.storage.exc import StorageDBError
-
-
-class ArchiverStorage():
- """SWH Archiver storage proxy, encompassing DB
-
- """
- def __init__(self, dbconn):
- """
- Args:
- db_conn: either a libpq connection string, or a psycopg2 connection
-
- """
- try:
- if isinstance(dbconn, psycopg2.extensions.connection):
- self.db = ArchiverDb(dbconn)
- else:
- self.db = ArchiverDb.connect(dbconn)
- except psycopg2.OperationalError as e:
- raise StorageDBError(e)
-
- @db_transaction_generator
- def archive_ls(self, cur=None):
- """ Get all the archives registered on the server.
-
- Yields:
- a tuple (server_id, server_url) for each archive server.
- """
- yield from self.db.archive_ls(cur)
-
- @db_transaction
- def content_archive_get(self, content_id, cur=None):
- """ Get the archival status of a content.
-
- Retrieve from the database the archival status of the given content
-
- Args:
- content_id: the sha1 of the content
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
- """
- return self.db.content_archive_get(content_id, cur)
-
- @db_transaction_generator
- def content_archive_get_copies(self, last_content=None, limit=1000,
- cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from self.db.content_archive_get_copies(last_content, limit,
- cur)
-
- @db_transaction_generator
- def content_archive_get_unarchived_copies(
- self, retention_policy, last_content=None,
- limit=1000, cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`. Yields only copies with number of present
- smaller than `retention policy`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- retention_policy: number of required present copies
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from self.db.content_archive_get_unarchived_copies(
- retention_policy, last_content, limit, cur)
-
- @db_transaction_generator
- def content_archive_get_missing(self, content_ids, backend_name, cur=None):
- """Retrieve missing sha1s from source_name.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
- source_name (str): Name of the backend to check for content
-
- Yields:
- missing sha1s from backend_name
-
- """
- db = self.db
-
- db.mktemp_content_archive()
-
- db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur)
-
- for content_id in db.content_archive_get_missing(backend_name, cur):
- yield content_id[0]
-
- @db_transaction_generator
- def content_archive_get_unknown(self, content_ids, cur=None):
- """Retrieve unknown sha1s from content_archive.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
-
- Yields:
- Unknown sha1s from content_archive
-
- """
- db = self.db
-
- db.mktemp_content_archive()
-
- db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur)
-
- for content_id in db.content_archive_get_unknown(cur):
- yield content_id[0]
-
- @db_transaction
- def content_archive_update(self, content_id, archive_id,
- new_status=None, cur=None):
- """ Update the status of an archive content and set its mtime to now
-
- Change the mtime of an archived content for the given archive and set
- it's mtime to the current time.
-
- Args:
- content_id (str): content sha1
- archive_id (str): name of the archive
- new_status (str): one of 'missing', 'present' or 'ongoing'.
- this status will replace the previous one. If not given,
- the function only change the mtime of the content for the
- given archive.
- """
- self.db.content_archive_update(content_id, archive_id, new_status, cur)
-
- @db_transaction
- def content_archive_add(
- self, content_ids, sources_present, cur=None):
- """Insert a new entry in db about content_id.
-
- Args:
- content_ids ([bytes|str]): content identifiers
- sources_present ([str]): List of source names where
- contents are present
- """
- db = self.db
-
- # Prepare copies dictionary
- copies = {}
- for source in sources_present:
- copies[source] = {
- "status": "present",
- "mtime": int(time.time()),
- }
-
- copies = json.dumps(copies)
- num_present = len(sources_present)
-
- db.mktemp('content_archive')
- db.copy_to(
- ({'content_id': id,
- 'copies': copies,
- 'num_present': num_present}
- for id in content_ids),
- 'tmp_content_archive',
- ['content_id', 'copies', 'num_present'],
- cur)
- db.content_archive_add_from_temp(cur)
-
-
-class StubArchiverStorage():
- def __init__(self, archives, present, missing, logfile_base):
- """
- A stub storage for the archiver that doesn't write to disk
-
- Args:
- - archives: a dictionary mapping archive names to archive URLs
- - present: archives where the objects are all considered present
- - missing: archives where the objects are all considered missing
- - logfile_base: basename for the logfile
- """
- self.archives = archives
- self.present = set(present)
- self.missing = set(missing)
- if set(archives) != self.present | self.missing:
- raise ValueError("Present and missing archives don't match")
- self.logfile_base = logfile_base
- self.__logfile = None
-
- def open_logfile(self):
- if self.__logfile:
- return
-
- logfile_name = "%s.%d" % (self.logfile_base, os.getpid())
- self.__logfile = open(logfile_name, 'a')
-
- def close_logfile(self):
- if not self.__logfile:
- return
-
- self.__logfile.close()
- self.__logfile = None
-
- def archive_ls(self, cur=None):
- """ Get all the archives registered on the server.
-
- Yields:
- a tuple (server_id, server_url) for each archive server.
- """
- yield from self.archives.items()
-
- def content_archive_get(self, content_id, cur=None):
- """ Get the archival status of a content.
-
- Retrieve from the database the archival status of the given content
-
- Args:
- content_id: the sha1 of the content
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
- """
- return (content_id, self.present, {})
-
- def content_archive_get_copies(self, last_content=None, limit=1000,
- cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from []
-
- def content_archive_get_unarchived_copies(self, retention_policy,
- last_content=None, limit=1000,
- cur=None):
- """ Get the list of copies for `limit` contents starting after
- `last_content`. Yields only copies with number of present
- smaller than `retention policy`.
-
- Args:
- last_content: sha1 of the last content retrieved. May be None
- to start at the beginning.
- retention_policy: number of required present copies
- limit: number of contents to retrieve. Can be None to retrieve all
- objects (will be slow).
-
- Yields:
- A tuple (content_id, present_copies, ongoing_copies), where
- ongoing_copies is a dict mapping copy to mtime.
-
- """
- yield from []
-
- def content_archive_get_missing(self, content_ids, backend_name, cur=None):
- """Retrieve missing sha1s from source_name.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
- source_name (str): Name of the backend to check for content
-
- Yields:
- missing sha1s from backend_name
-
- """
- if backend_name in self.missing:
- yield from content_ids
- elif backend_name in self.present:
- yield from []
- else:
- raise ValueError('Unknown backend `%s`' % backend_name)
-
- def content_archive_get_unknown(self, content_ids, cur=None):
- """Retrieve unknown sha1s from content_archive.
-
- Args:
- content_ids ([sha1s]): list of sha1s to test
-
- Yields:
- Unknown sha1s from content_archive
-
- """
- yield from []
-
- def content_archive_update(self, content_id, archive_id,
- new_status=None, cur=None):
- """ Update the status of an archive content and set its mtime to now
-
- Change the mtime of an archived content for the given archive and set
- it's mtime to the current time.
-
- Args:
- content_id (str): content sha1
- archive_id (str): name of the archive
- new_status (str): one of 'missing', 'present' or 'ongoing'.
- this status will replace the previous one. If not given,
- the function only change the mtime of the content for the
- given archive.
- """
- if not self.__logfile:
- self.open_logfile()
-
- print(time.time(), archive_id, new_status,
- hashutil.hash_to_hex(content_id), file=self.__logfile)
-
- def content_archive_add(
- self, content_ids, sources_present, cur=None):
- """Insert a new entry in db about content_id.
-
- Args:
- content_ids ([bytes|str]): content identifiers
- sources_present ([str]): List of source names where
- contents are present
- """
- pass
-
-
-def get_archiver_storage(cls, args):
- """Instantiate an archiver database with the proper class and arguments"""
- if cls == 'db':
- return ArchiverStorage(**args)
- elif cls == 'stub':
- return StubArchiverStorage(**args)
- else:
- raise ValueError('Unknown Archiver Storage class `%s`' % cls)
diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py
deleted file mode 100644
index ccb0a2f6..00000000
--- a/swh/storage/archiver/tasks.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from swh.scheduler.task import Task
-from .worker import ArchiverWithRetentionPolicyWorker
-from .worker import ArchiverToBackendWorker
-
-
-class SWHArchiverWithRetentionPolicyTask(Task):
- """Main task that archive a batch of content.
-
- """
- task_queue = 'swh_storage_archive_worker'
-
- def run_task(self, *args, **kwargs):
- ArchiverWithRetentionPolicyWorker(*args, **kwargs).run()
-
-
-class SWHArchiverToBackendTask(Task):
- """Main task that archive a batch of content in the cloud.
-
- """
- task_queue = 'swh_storage_archive_worker_to_backend'
-
- def run_task(self, *args, **kwargs):
- ArchiverToBackendWorker(*args, **kwargs).run()
diff --git a/swh/storage/archiver/updater.py b/swh/storage/archiver/updater.py
deleted file mode 100644
index 64cf1cea..00000000
--- a/swh/storage/archiver/updater.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright (C) 2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import logging
-
-from swh.journal.client import SWHJournalClient
-
-from .storage import get_archiver_storage
-
-
-class SWHArchiverContentUpdater(SWHJournalClient):
- """Client in charge of updating new contents in the content_archiver
- db.
-
- This is a swh.journal client only dealing with contents.
-
- """
- CONFIG_BASE_FILENAME = 'archiver/content_updater'
-
- ADDITIONAL_CONFIG = {
- 'archiver_storage': (
- 'dict', {
- 'cls': 'db',
- 'args': {
- 'dbconn': 'dbname=softwareheritage-archiver-dev '
- 'user=guest',
- }
- }),
- 'sources_present': ('list[str]', ['uffizi'])
- }
-
- def __init__(self):
- # Only interested in content here so override the configuration
- super().__init__(extra_configuration={'object_types': ['content']})
-
- self.sources_present = self.config['sources_present']
-
- self.archiver_storage = get_archiver_storage(
- **self.config['archiver_storage'])
-
- def process_objects(self, messages):
- self.archiver_storage.content_archive_add(
- (c[b'sha1'] for c in messages['content']),
- self.sources_present)
-
-
-if __name__ == '__main__':
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s %(process)d %(levelname)s %(message)s'
- )
-
- content_updater = SWHArchiverContentUpdater()
- content_updater.process()
diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py
deleted file mode 100644
index c94d6f15..00000000
--- a/swh/storage/archiver/worker.py
+++ /dev/null
@@ -1,429 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import abc
-import datetime
-import logging
-import random
-
-from collections import defaultdict
-from celery import group
-
-from swh.core import config, utils
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.model import hashutil
-from swh.scheduler.utils import get_task
-
-from .storage import get_archiver_storage
-from .copier import ArchiverCopier
-
-
-logger = logging.getLogger('archiver.worker')
-
-
-class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta):
- """Base archive worker.
-
- Inherit from this class and override:
- - ADDITIONAL_CONFIG: Some added configuration needed for the
- director to work
- - CONFIG_BASE_FILENAME: relative path to lookup for the
- configuration file
- - def need_archival(self, content_data): Determine if a content
- needs archival or not
- - def choose_backup_servers(self, present, missing): Choose
- which backup server to send copies to
-
- """
- DEFAULT_CONFIG = {
- 'archiver_storage': ('dict', {
- 'cls': 'db',
- 'args': {
- 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest',
- },
- }),
- 'storages': ('list[dict]',
- [
- {'host': 'uffizi',
- 'cls': 'pathslicing',
- 'args': {'root': '/tmp/softwareheritage/objects',
- 'slicing': '0:2/2:4/4:6'}},
- {'host': 'banco',
- 'cls': 'remote',
- 'args': {'base_url': 'http://banco:5003/'}}
- ])
- }
-
- ADDITIONAL_CONFIG = {}
-
- CONFIG_BASE_FILENAME = 'archiver/worker'
-
- objstorages = {}
-
- def __init__(self, batch):
- super().__init__()
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
- self.batch = batch
- self.archiver_db = get_archiver_storage(
- **self.config['archiver_storage'])
- self.objstorages = {
- storage['host']: get_objstorage(storage['cls'], storage['args'])
- for storage in self.config.get('storages', [])
- }
- self.set_objstorages = set(self.objstorages)
-
- def run(self):
- """Do the task expected from the archiver worker.
-
- Process the contents in self.batch, ensure that the elements
- still need an archival (using archiver db), and spawn copiers
- to copy files in each destination according to the
- archiver-worker's policy.
-
- """
- transfers = defaultdict(list)
- for obj_id in self.batch:
- # Get dict {'missing': [servers], 'present': [servers]}
- # for contents ignoring those who don't need archival.
- copies = self.compute_copies(self.set_objstorages, obj_id)
- if not copies: # could not happen if using .director module
- msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id)
- logger.warning(msg)
- continue
-
- if not self.need_archival(copies):
- continue
-
- present = copies.get('present', set())
- missing = copies.get('missing', set())
- if len(present) == 0:
- msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id)
- logger.critical(msg)
- continue
-
- # Choose servers to be used as srcs and dests.
- for src_dest in self.choose_backup_servers(present, missing):
- transfers[src_dest].append(obj_id)
-
- # Then run copiers for each of the required transfers.
- contents_copied = []
- for (src, dest), content_ids in transfers.items():
- contents_copied.extend(self.run_copier(src, dest, content_ids))
-
- # copy is done, eventually do something else with them
- self.copy_finished(contents_copied)
-
- def compute_copies(self, set_objstorages, content_id):
- """From a content_id, return present and missing copies.
-
- Args:
- objstorages (set): objstorage's id name
- content_id: the content concerned
-
- Returns:
- A dictionary with the following keys:
- - 'present': set of archives where the content is present
- - 'missing': set of archives where the content is missing
- - 'ongoing': ongoing copies: dict mapping the archive id
- with the time the copy supposedly started.
- """
- result = self.archiver_db.content_archive_get(content_id)
- if not result:
- return None
- _, present, ongoing = result
- set_present = set_objstorages & set(present)
- set_ongoing = set_objstorages & set(ongoing)
- set_missing = set_objstorages - set_present - set_ongoing
- return {
- 'present': set_present,
- 'missing': set_missing,
- 'ongoing': {archive: value
- for archive, value in ongoing.items()
- if archive in set_ongoing},
- }
-
- def run_copier(self, source, destination, content_ids):
- """Run a copier in order to archive the given contents.
-
- Upload the given contents from the source to the destination.
- If the process fails, the whole content is considered uncopied
- and remains 'ongoing', waiting to be rescheduled as there is a
- delay.
-
- Args:
- source (str): source storage's identifier
- destination (str): destination storage's identifier
- content_ids ([sha1]): list of content ids to archive.
-
- """
- # Check if there are any errors among the contents.
- content_status = self.get_contents_error(content_ids, source)
-
- # Iterates over the error detected.
- for content_id, real_status in content_status.items():
- # Remove them from the to-archive list,
- # as they cannot be retrieved correctly.
- content_ids.remove(content_id)
- # Update their status to reflect their real state.
- self.archiver_db.content_archive_update(
- content_id, archive_id=source, new_status=real_status)
-
- # Now perform the copy on the remaining contents
- ac = ArchiverCopier(
- source=self.objstorages[source],
- destination=self.objstorages[destination],
- content_ids=content_ids)
-
- if ac.run():
- # Once the archival complete, update the database.
- for content_id in content_ids:
- self.archiver_db.content_archive_update(
- content_id, archive_id=destination, new_status='present')
-
- return content_ids
- return []
-
- def copy_finished(self, content_ids):
- """Hook to notify the content_ids archive copy is finished.
- (This is not an abstract method as this is optional
- """
- pass
-
- def get_contents_error(self, content_ids, source_storage):
- """Indicates what is the error associated to a content when needed
-
- Check the given content on the given storage. If an error is detected,
- it will be reported through the returned dict.
-
- Args:
- content_ids ([sha1]): list of content ids to check
- source_storage (str): the source storage holding the
- contents to check.
-
- Returns:
- a dict that map {content_id -> error_status} for each content_id
- with an error. The `error_status` result may be 'missing' or
- 'corrupted'.
-
- """
- content_status = {}
- storage = self.objstorages[source_storage]
- for content_id in content_ids:
- try:
- storage.check(content_id)
- except Error:
- content_status[content_id] = 'corrupted'
- logger.error('%s corrupted!' % hashutil.hash_to_hex(
- content_id))
- except ObjNotFoundError:
- content_status[content_id] = 'missing'
- logger.error('%s missing!' % hashutil.hash_to_hex(content_id))
-
- return content_status
-
- @abc.abstractmethod
- def need_archival(self, content_data):
- """Indicate if the content needs to be archived.
-
- Args:
- content_data (dict): dict that contains two lists 'present' and
- 'missing' with copies id corresponding to this status.
-
- Returns:
- True if there is not enough copies, False otherwise.
-
- """
- pass
-
- @abc.abstractmethod
- def choose_backup_servers(self, present, missing):
- """Choose and yield the required amount of couple source/destination
-
- For each required copy, choose a unique destination server
- among the missing copies and a source server among the
- presents.
-
- Args:
- present: set of objstorage source name where the content
- is present
- missing: set of objstorage destination name where the
- content is missing
-
- Yields:
- tuple (source (str), destination (src)) for each required copy.
-
- """
- pass
-
-
-class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker):
- """ Do the required backups on a given batch of contents.
-
- Process the content of a content batch in order to do the needed backups on
- the slaves servers.
- """
-
- ADDITIONAL_CONFIG = {
- 'retention_policy': ('int', 2),
- 'archival_max_age': ('int', 3600),
- 'sources': ('list[str]', ['uffizi', 'banco']),
- }
-
- def __init__(self, batch):
- """ Constructor of the ArchiverWorker class.
-
- Args:
- batch: list of object's sha1 that potentially need archival.
- """
- super().__init__(batch)
- config = self.config
- self.retention_policy = config['retention_policy']
- self.archival_max_age = config['archival_max_age']
- self.sources = config['sources']
-
- if len(self.objstorages) < self.retention_policy:
- raise ValueError('Retention policy is too high for the number of '
- 'provided servers')
-
- def need_archival(self, content_data):
- """ Indicate if the content need to be archived.
-
- Args:
- content_data (dict): dict that contains two lists 'present' and
- 'missing' with copies id corresponding to this status.
- Returns: True if there is not enough copies, False otherwise.
- """
- nb_presents = len(content_data.get('present', []))
- for copy, mtime in content_data.get('ongoing', {}).items():
- if not self._is_archival_delay_elapsed(mtime):
- nb_presents += 1
- return nb_presents < self.retention_policy
-
- def _is_archival_delay_elapsed(self, start_time):
- """ Indicates if the archival delay is elapsed given the start_time
-
- Args:
- start_time (float): time at which the archival started.
-
- Returns:
- True if the archival delay is elasped, False otherwise
- """
- elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - start_time
- return elapsed > datetime.timedelta(seconds=self.archival_max_age)
-
- def choose_backup_servers(self, present, missing):
- """Choose and yield the required amount of couple source/destination
-
- For each required copy, choose a unique destination server
- among the missing copies and a source server among the
- presents.
-
- Each destination server is unique so after archival, the
- retention policy requirement will be fulfilled. However, the
- source server may be used multiple times.
-
- Args:
- present: set of objstorage source name where the content
- is present
- missing: set of objstorage destination name where the
- content is missing
-
- Yields:
- tuple (source, destination) for each required copy.
-
- """
- # Transform from set to list to allow random selections
- missing = list(missing)
- present = list(present)
- all_sources = [source for source in present if source in self.sources]
- nb_required = self.retention_policy - len(present)
- destinations = random.sample(missing, nb_required)
- sources = [random.choice(all_sources) for dest in destinations]
- yield from zip(sources, destinations)
-
-
-class ArchiverToBackendWorker(BaseArchiveWorker):
- """Worker that sends copies over from a source to another backend.
-
- Process the content of a content batch from source objstorage to
- destination objstorage.
-
- """
-
- CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
-
- ADDITIONAL_CONFIG = {
- 'next_task': (
- 'dict', {
- 'queue': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask',
- 'batch_size': 10,
- }
- )
- }
-
- def __init__(self, destination, batch):
- """Constructor of the ArchiverWorkerToBackend class.
-
- Args:
- destination: where to copy the objects from
- batch: sha1s to send to destination
-
- """
- super().__init__(batch)
- self.destination = destination
- next_task = self.config['next_task']
- if next_task:
- destination_queue = next_task['queue']
- self.task_destination = get_task(destination_queue)
- self.batch_size = int(next_task['batch_size'])
- else:
- self.task_destination = self.batch_size = None
-
- def need_archival(self, content_data):
- """Indicate if the content needs to be archived.
-
- Args:
- content_data (dict): dict that contains 3 lists 'present',
- 'ongoing' and 'missing' with copies id corresponding to
- this status.
-
- Returns:
- True if we need to archive, False otherwise
-
- """
- return self.destination in content_data.get('missing', {})
-
- def choose_backup_servers(self, present, missing):
- """The destination is fixed to the destination mentioned.
-
- The only variable here is the source of information that we
- choose randomly in 'present'.
-
- Args:
- present: set of objstorage source name where the content
- is present
- missing: set of objstorage destination name where the
- content is missing
-
- Yields:
- tuple (source, destination) for each required copy.
-
- """
- yield (random.choice(list(present)), self.destination)
-
- def copy_finished(self, content_ids):
- """Once the copy is finished, we'll send those batch of contents as
- done in the destination queue.
-
- """
- if self.task_destination:
- groups = []
- for ids in utils.grouper(content_ids, self.batch_size):
- sig_ids = self.task_destination.s(list(ids))
- groups.append(sig_ids)
-
- group(groups).delay()
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
deleted file mode 100644
index 036d2121..00000000
--- a/swh/storage/tests/test_archiver.py
+++ /dev/null
@@ -1,486 +0,0 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-import glob
-import tempfile
-import shutil
-import unittest
-import os
-
-from nose.tools import istest
-from nose.plugins.attrib import attr
-
-from swh.core.tests.db_testing import DbsTestFixture
-
-from swh.storage.archiver.storage import get_archiver_storage
-
-from swh.storage.archiver import ArchiverWithRetentionPolicyDirector
-from swh.storage.archiver import ArchiverWithRetentionPolicyWorker
-from swh.storage.archiver.db import utcnow
-
-from swh.objstorage import get_objstorage
-from swh.objstorage.exc import ObjNotFoundError
-
-try:
- # objstorage > 0.17
- from swh.objstorage.api.server import make_app as app
- from server_testing import ServerTestFixtureAsync as ServerTestFixture
- MIGRATED = True
-except ImportError:
- # objstorage <= 0.17
- from swh.objstorage.api.server import app
- from server_testing import ServerTestFixture
- MIGRATED = False
-
-TEST_DIR = os.path.dirname(os.path.abspath(__file__))
-TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
-
-
-@attr('db')
-class TestArchiver(DbsTestFixture, ServerTestFixture,
- unittest.TestCase):
- """ Test the objstorage archiver.
- """
-
- TEST_DB_NAMES = [
- 'softwareheritage-archiver-test',
- ]
- TEST_DB_DUMPS = [
- os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'),
- ]
- TEST_DB_DUMP_TYPES = [
- 'pg_dump',
- ]
-
- def setUp(self):
- # Launch the backup server
- self.dest_root = tempfile.mkdtemp(prefix='remote')
- self.config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.dest_root,
- 'slicing': '0:2/2:4/4:6',
- }
- }
- if MIGRATED:
- self.app = app(self.config)
- else:
- self.app = app
- super().setUp()
-
- # Retrieve connection (depends on the order in TEST_DB_NAMES)
- self.conn = self.conns[0] # archiver db's connection
- self.cursor = self.cursors[0]
-
- # Create source storage
- self.src_root = tempfile.mkdtemp()
- src_config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.src_root,
- 'slicing': '0:2/2:4/4:6'
- }
- }
- self.src_storage = get_objstorage(**src_config)
-
- # Create destination storage
- dest_config = {
- 'cls': 'remote',
- 'args': {
- 'url': self.url()
- }
- }
- self.dest_storage = get_objstorage(**dest_config)
-
- # Keep mapped the id to the storages
- self.storages = {
- 'uffizi': self.src_storage,
- 'banco': self.dest_storage
- }
-
- # Override configurations
- src_archiver_conf = {'host': 'uffizi'}
- dest_archiver_conf = {'host': 'banco'}
- src_archiver_conf.update(src_config)
- dest_archiver_conf.update(dest_config)
- self.archiver_storages = [src_archiver_conf, dest_archiver_conf]
- self._override_director_config()
- self._override_worker_config()
- # Create the base archiver
- self.archiver = self._create_director()
-
- def tearDown(self):
- self.empty_tables()
- shutil.rmtree(self.src_root)
- shutil.rmtree(self.dest_root)
- super().tearDown()
-
- def empty_tables(self):
- # Remove all content
- self.cursor.execute('DELETE FROM content')
- self.cursor.execute('DELETE FROM content_copies')
- self.conn.commit()
-
- def _override_director_config(self, retention_policy=2):
- """ Override the default config of the Archiver director
- to allow the tests to use the *-test db instead of the default one as
- there is no configuration file for now.
- """
- ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa
- 'archiver_storage': {
- 'cls': 'db',
- 'args': {
- 'dbconn': self.conn,
- },
- },
- 'batch_max_size': 5000,
- 'archival_max_age': 3600,
- 'retention_policy': retention_policy,
- 'asynchronous': False,
- 'max_queue_length': 100000,
- 'queue_throttling_delay': 120,
- }
-
- def _override_worker_config(self):
- """ Override the default config of the Archiver worker
- to allow the tests to use the *-test db instead of the default one as
- there is no configuration file for now.
- """
- ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa
- 'retention_policy': 2,
- 'archival_max_age': 3600,
- 'archiver_storage': {
- 'cls': 'db',
- 'args': {
- 'dbconn': self.conn,
- },
- },
- 'storages': self.archiver_storages,
- 'source': 'uffizi',
- 'sources': ['uffizi'],
- }
-
- def _create_director(self):
- return ArchiverWithRetentionPolicyDirector(start_id=None)
-
- def _create_worker(self, batch={}):
- return ArchiverWithRetentionPolicyWorker(batch)
-
- def _add_content(self, storage_name, content_data):
- """ Add really a content to the given objstorage
-
- This put an empty status for the added content.
-
- Args:
- storage_name: the concerned storage
- content_data: the data to insert
- with_row_insert: to insert a row entry in the db or not
-
- """
- # Add the content to the storage
- obj_id = self.storages[storage_name].add(content_data)
- self.cursor.execute(""" INSERT INTO content (sha1)
- VALUES (%s)
- """, (obj_id,))
- return obj_id
-
- def _update_status(self, obj_id, storage_name, status, date=None):
- """ Update the db status for the given id/storage_name.
-
- This does not create the content in the storage.
- """
- self.cursor.execute("""insert into archive (name)
- values (%s)
- on conflict do nothing""", (storage_name,))
-
- self.archiver.archiver_storage.content_archive_update(
- obj_id, storage_name, status
- )
-
- # Integration test
- @istest
- def archive_missing_content(self):
- """ Run archiver on a missing content should archive it.
- """
- obj_data = b'archive_missing_content'
- obj_id = self._add_content('uffizi', obj_data)
- self._update_status(obj_id, 'uffizi', 'present')
- # Content is missing on banco (entry not present in the db)
- try:
- self.dest_storage.get(obj_id)
- except ObjNotFoundError:
- pass
- else:
- self.fail('Content should not be present before archival')
- self.archiver.run()
- # now the content should be present on remote objstorage
- remote_data = self.dest_storage.get(obj_id)
- self.assertEquals(obj_data, remote_data)
-
- @istest
- def archive_present_content(self):
- """ A content that is not 'missing' shouldn't be archived.
- """
- obj_id = self._add_content('uffizi', b'archive_present_content')
- self._update_status(obj_id, 'uffizi', 'present')
- self._update_status(obj_id, 'banco', 'present')
- # After the run, the content should NOT be in the archive.
- # As the archiver believe it was already in.
- self.archiver.run()
- with self.assertRaises(ObjNotFoundError):
- self.dest_storage.get(obj_id)
-
- @istest
- def archive_already_enough(self):
- """ A content missing with enough copies shouldn't be archived.
- """
- obj_id = self._add_content('uffizi', b'archive_alread_enough')
- self._update_status(obj_id, 'uffizi', 'present')
- self._override_director_config(retention_policy=1)
- director = self._create_director()
- # Obj is present in only one archive but only one copy is required.
- director.run()
- with self.assertRaises(ObjNotFoundError):
- self.dest_storage.get(obj_id)
-
- @istest
- def content_archive_get_copies(self):
- self.assertCountEqual(
- self.archiver.archiver_storage.content_archive_get_copies(),
- [],
- )
- obj_id = self._add_content('uffizi', b'archive_alread_enough')
- self._update_status(obj_id, 'uffizi', 'present')
- self.assertCountEqual(
- self.archiver.archiver_storage.content_archive_get_copies(),
- [(obj_id, ['uffizi'], {})],
- )
-
- # Unit tests for archive worker
-
- def archival_elapsed(self, mtime):
- return self._create_worker()._is_archival_delay_elapsed(mtime)
-
- @istest
- def vstatus_ongoing_remaining(self):
- self.assertFalse(self.archival_elapsed(utcnow()))
-
- @istest
- def vstatus_ongoing_elapsed(self):
- past_time = (utcnow()
- - datetime.timedelta(
- seconds=self._create_worker().archival_max_age))
- self.assertTrue(self.archival_elapsed(past_time))
-
- @istest
- def need_archival_missing(self):
- """ A content should need archival when it is missing.
- """
- status_copies = {'present': ['uffizi'], 'missing': ['banco']}
- worker = self._create_worker()
- self.assertEqual(worker.need_archival(status_copies),
- True)
-
- @istest
- def need_archival_present(self):
- """ A content present everywhere shouldn't need archival
- """
- status_copies = {'present': ['uffizi', 'banco']}
- worker = self._create_worker()
- self.assertEqual(worker.need_archival(status_copies),
- False)
-
- def _compute_copies_status(self, status):
- """ A content with a given status should be detected correctly
- """
- obj_id = self._add_content(
- 'banco', b'compute_copies_' + bytes(status, 'utf8'))
- self._update_status(obj_id, 'banco', status)
- worker = self._create_worker()
- self.assertIn('banco', worker.compute_copies(
- set(worker.objstorages), obj_id)[status])
-
- @istest
- def compute_copies_present(self):
- """ A present content should be detected with correct status
- """
- self._compute_copies_status('present')
-
- @istest
- def compute_copies_missing(self):
- """ A missing content should be detected with correct status
- """
- self._compute_copies_status('missing')
-
- @istest
- def compute_copies_extra_archive(self):
- obj_id = self._add_content('banco', b'foobar')
- self._update_status(obj_id, 'banco', 'present')
- self._update_status(obj_id, 'random_archive', 'present')
- worker = self._create_worker()
- copies = worker.compute_copies(set(worker.objstorages), obj_id)
- self.assertEqual(copies['present'], {'banco'})
- self.assertEqual(copies['missing'], {'uffizi'})
-
- def _get_backups(self, present, missing):
- """ Return a list of the pair src/dest from the present and missing
- """
- worker = self._create_worker()
- return list(worker.choose_backup_servers(present, missing))
-
- @istest
- def choose_backup_servers(self):
- self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0)
- self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1)
- # Even with more possible destinations, do not take more than the
- # retention_policy require
- self.assertEqual(
- len(self._get_backups(['uffizi'], ['banco', 's3'])),
- 1
- )
-
-
-class TestArchiverStorageStub(unittest.TestCase):
- def setUp(self):
- self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local')
- self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote')
- self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log')
-
- src_config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.src_root,
- 'slicing': '0:2/2:4/4:6'
- }
- }
- self.src_storage = get_objstorage(**src_config)
-
- # Create destination storage
- dest_config = {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.dest_root,
- 'slicing': '0:2/2:4/4:6'
- }
- }
- self.dest_storage = get_objstorage(**dest_config)
-
- self.config = {
- 'cls': 'stub',
- 'args': {
- 'archives': {
- 'present_archive': 'http://uffizi:5003',
- 'missing_archive': 'http://banco:5003',
- },
- 'present': ['present_archive'],
- 'missing': ['missing_archive'],
- 'logfile_base': os.path.join(self.log_root, 'log_'),
- }
- }
-
- # Generated with:
- #
- # id_length = 20
- # random.getrandbits(8 * id_length).to_bytes(id_length, 'big')
- #
- self.content_ids = [
- b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f",
- b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d',
- b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83',
- b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2',
- b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc',
- ]
-
- self.archiver_storage = get_archiver_storage(**self.config)
- super().setUp()
-
- def tearDown(self):
- shutil.rmtree(self.src_root)
- shutil.rmtree(self.dest_root)
- shutil.rmtree(self.log_root)
- super().tearDown()
-
- @istest
- def archive_ls(self):
- self.assertCountEqual(
- self.archiver_storage.archive_ls(),
- self.config['args']['archives'].items()
- )
-
- @istest
- def content_archive_get(self):
- for content_id in self.content_ids:
- self.assertEqual(
- self.archiver_storage.content_archive_get(content_id),
- (content_id, set(self.config['args']['present']), {}),
- )
-
- @istest
- def content_archive_get_copies(self):
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_copies(),
- [],
- )
-
- @istest
- def content_archive_get_unarchived_copies(self):
- retention_policy = 2
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_unarchived_copies(
- retention_policy),
- [],
- )
-
- @istest
- def content_archive_get_missing(self):
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_missing(
- self.content_ids,
- 'missing_archive'
- ),
- self.content_ids,
- )
-
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_missing(
- self.content_ids,
- 'present_archive'
- ),
- [],
- )
-
- with self.assertRaises(ValueError):
- list(self.archiver_storage.content_archive_get_missing(
- self.content_ids,
- 'unknown_archive'
- ))
-
- @istest
- def content_archive_get_unknown(self):
- self.assertCountEqual(
- self.archiver_storage.content_archive_get_unknown(
- self.content_ids,
- ),
- [],
- )
-
- @istest
- def content_archive_update(self):
- for content_id in self.content_ids:
- self.archiver_storage.content_archive_update(
- content_id, 'present_archive', 'present')
- self.archiver_storage.content_archive_update(
- content_id, 'missing_archive', 'present')
-
- self.archiver_storage.close_logfile()
-
- # Make sure we created a logfile
- files = glob.glob('%s*' % self.config['args']['logfile_base'])
- self.assertEqual(len(files), 1)
-
- # make sure the logfile contains all our lines
- lines = open(files[0]).readlines()
- self.assertEqual(len(lines), 2 * len(self.content_ids))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:07 PM (2 w, 57 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3255572

Event Timeline