diff --git a/MANIFEST.in b/MANIFEST.in index 8bd8f0a..2dfa00b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,7 +1,8 @@ include Makefile include Makefile.local include README.md include requirements.txt include requirements-swh.txt include version.txt recursive-include sql * +recursive-include swh/archiver/tests/data *.sql diff --git a/swh/archiver/tests/data/dumps/swh-archiver.sql b/swh/archiver/tests/data/dumps/swh-archiver.sql new file mode 100644 index 0000000..75cb8de --- /dev/null +++ b/swh/archiver/tests/data/dumps/swh-archiver.sql @@ -0,0 +1,384 @@ +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 10.4 (Debian 10.4-2.pgdg+1) +-- Dumped by pg_dump version 10.4 (Debian 10.4-2.pgdg+1) + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- Name: plpgsql; Type: EXTENSION; Schema: -; Owner: - +-- + +CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; + + +-- +-- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: - +-- + +COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; + + +-- +-- Name: archive_status; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present', + 'corrupted' +); + + +-- +-- Name: TYPE archive_status; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TYPE public.archive_status IS 'Status of a given copy of a content'; + + +-- +-- Name: bucket; Type: DOMAIN; Schema: public; Owner: - +-- + +CREATE DOMAIN public.bucket AS bytea + CONSTRAINT bucket_check CHECK ((length(VALUE) = 2)); + + +-- +-- Name: sha1; Type: DOMAIN; Schema: public; Owner: - +-- + +CREATE DOMAIN public.sha1 AS bytea + CONSTRAINT sha1_check CHECK ((length(VALUE) = 20)); + + +-- +-- Name: swh_content_copies_from_temp(text[]); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_content_copies_from_temp(archive_names text[]) RETURNS void + LANGUAGE plpgsql + AS $$ + begin + with existing_content_ids as ( + select id + from content + inner join tmp_content on content.sha1 = tmp.sha1 + ), created_content_ids as ( + insert into content (sha1) + select sha1 from tmp_content + on conflict do nothing + returning id + ), content_ids as ( + select * from existing_content_ids + union all + select * from created_content_ids + ), archive_ids as ( + select id from archive + where name = any(archive_names) + ) insert into content_copies (content_id, archive_id, mtime, status) + select content_ids.id, archive_ids.id, now(), 'present' + from content_ids cross join archive_ids + on conflict (content_id, archive_id) do update + set mtime = excluded.mtime, status = excluded.status; + end +$$; + + +-- +-- Name: swh_mktemp_content(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_mktemp_content() RETURNS void + LANGUAGE plpgsql + AS $$ + begin + create temporary table tmp_content ( + sha1 sha1 not null + ) on commit drop; + return; + end +$$; + + +SET default_tablespace = ''; + +SET default_with_oids = false; + +-- +-- Name: archive; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.archive ( + id bigint NOT NULL, + name text NOT NULL +); + + +-- +-- Name: TABLE archive; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.archive IS 'The archives in which contents are stored'; + + +-- +-- Name: COLUMN archive.id; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.archive.id IS 'Short identifier for archives'; + + +-- +-- Name: COLUMN archive.name; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.archive.name IS 'Name of the archive'; + + +-- +-- Name: archive_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.archive_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: archive_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.archive_id_seq OWNED BY public.archive.id; + + +-- +-- Name: content; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.content ( + id bigint NOT NULL, + sha1 public.sha1 NOT NULL +); + + +-- +-- Name: TABLE content; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.content IS 'All the contents being archived by Software Heritage'; + + +-- +-- Name: COLUMN content.id; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.content.id IS 'Short id for the content being archived'; + + +-- +-- Name: COLUMN content.sha1; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.content.sha1 IS 'SHA1 hash of the content being archived'; + + +-- +-- Name: content_copies; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.content_copies ( + content_id bigint NOT NULL, + archive_id bigint NOT NULL, + mtime timestamp with time zone, + status public.archive_status NOT NULL +); + + +-- +-- Name: TABLE content_copies; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.content_copies IS 'Tracking of all content copies in the archives'; + + +-- +-- Name: COLUMN content_copies.mtime; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.content_copies.mtime IS 'Last update time of the copy'; + + +-- +-- Name: COLUMN content_copies.status; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.content_copies.status IS 'Status of the copy'; + + +-- +-- Name: content_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.content_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: content_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.content_id_seq OWNED BY public.content.id; + + +-- +-- Name: dbversion; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.dbversion ( + version integer NOT NULL, + release timestamp with time zone, + description text +); + + +-- +-- Name: TABLE dbversion; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.dbversion IS 'Schema update tracking'; + + +-- +-- Name: archive id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.archive ALTER COLUMN id SET DEFAULT nextval('public.archive_id_seq'::regclass); + + +-- +-- Name: content id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.content ALTER COLUMN id SET DEFAULT nextval('public.content_id_seq'::regclass); + + +-- +-- Data for Name: archive; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.archive (id, name) FROM stdin; +1 uffizi +2 banco +3 azure +\. + + +-- +-- Data for Name: content; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.content (id, sha1) FROM stdin; +\. + + +-- +-- Data for Name: content_copies; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.content_copies (content_id, archive_id, mtime, status) FROM stdin; +\. + + +-- +-- Data for Name: dbversion; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.dbversion (version, release, description) FROM stdin; +10 2018-06-05 13:57:27.48746+02 Work In Progress +\. + + +-- +-- Name: archive_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - +-- + +SELECT pg_catalog.setval('public.archive_id_seq', 3, true); + + +-- +-- Name: content_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - +-- + +SELECT pg_catalog.setval('public.content_id_seq', 1, false); + + +-- +-- Name: archive archive_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.archive + ADD CONSTRAINT archive_pkey PRIMARY KEY (id); + + +-- +-- Name: content_copies content_copies_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.content_copies + ADD CONSTRAINT content_copies_pkey PRIMARY KEY (content_id, archive_id); + + +-- +-- Name: content content_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.content + ADD CONSTRAINT content_pkey PRIMARY KEY (id); + + +-- +-- Name: dbversion dbversion_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.dbversion + ADD CONSTRAINT dbversion_pkey PRIMARY KEY (version); + + +-- +-- Name: archive_name_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX archive_name_idx ON public.archive USING btree (name); + + +-- +-- Name: content_sha1_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX content_sha1_idx ON public.content USING btree (sha1); + + +-- +-- PostgreSQL database dump complete +-- + diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py index fa64a2e..710dfac 100644 --- a/swh/archiver/tests/test_archiver.py +++ b/swh/archiver/tests/test_archiver.py @@ -1,453 +1,453 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import glob import tempfile import shutil import unittest import os from nose.tools import istest from nose.plugins.attrib import attr from swh.core.tests.db_testing import SingleDbTestFixture from swh.archiver.storage import get_archiver_storage from swh.archiver import ArchiverWithRetentionPolicyDirector from swh.archiver import ArchiverWithRetentionPolicyWorker from swh.archiver.db import utcnow from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') + +DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') @attr('db') class TestArchiver(SingleDbTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAME = 'softwareheritage-archiver-test' - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump') - TEST_DB_DUMP_TYPE = 'pg_dump' + TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps', 'swh-archiver.sql') + TEST_DB_DUMP_TYPE = 'psql' def setUp(self): # Launch the backup server super().setUp() # Create source storage self.src_root = tempfile.mkdtemp() src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) self.dest_root = tempfile.mkdtemp(prefix='remote') dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6', } } self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = { 'uffizi': self.src_storage, 'banco': self.dest_storage } # Override configurations src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self._override_director_config() self._override_worker_config() # Create the base archiver self.archiver = self._create_director() def tearDown(self): self.empty_tables() shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) super().tearDown() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content') self.cursor.execute('DELETE FROM content_copies') self.conn.commit() def _override_director_config(self, retention_policy=2): """ Override the default config of the Archiver director to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'batch_max_size': 5000, 'archival_max_age': 3600, 'retention_policy': retention_policy, 'asynchronous': False, 'max_queue_length': 100000, 'queue_throttling_delay': 120, } def _override_worker_config(self): """ Override the default config of the Archiver worker to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa 'retention_policy': 2, 'archival_max_age': 3600, 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'storages': self.archiver_storages, 'source': 'uffizi', 'sources': ['uffizi'], } def _create_director(self): return ArchiverWithRetentionPolicyDirector(start_id=None) def _create_worker(self, batch={}): return ArchiverWithRetentionPolicyWorker(batch) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. Args: storage_name: the concerned storage content_data: the data to insert with_row_insert: to insert a row entry in the db or not """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) self.cursor.execute(""" INSERT INTO content (sha1) VALUES (%s) """, (obj_id,)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ self.cursor.execute("""insert into archive (name) values (%s) on conflict do nothing""", (storage_name,)) self.archiver.archiver_storage.content_archive_update( obj_id, storage_name, status ) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self._override_director_config(retention_policy=1) director = self._create_director() # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def content_archive_get_copies(self): self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [], ) obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [(obj_id, ['uffizi'], {})], ) # Unit tests for archive worker def archival_elapsed(self, mtime): return self._create_worker()._is_archival_delay_elapsed(mtime) @istest def vstatus_ongoing_remaining(self): self.assertFalse(self.archival_elapsed(utcnow())) @istest def vstatus_ongoing_elapsed(self): past_time = (utcnow() - datetime.timedelta( seconds=self._create_worker().archival_max_age)) self.assertTrue(self.archival_elapsed(past_time)) @istest def need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), True) @istest def need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker.compute_copies( set(worker.objstorages), obj_id)[status]) @istest def compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') @istest def compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') @istest def compute_copies_extra_archive(self): obj_id = self._add_content('banco', b'foobar') self._update_status(obj_id, 'banco', 'present') self._update_status(obj_id, 'random_archive', 'present') worker = self._create_worker() copies = worker.compute_copies(set(worker.objstorages), obj_id) self.assertEqual(copies['present'], {'banco'}) self.assertEqual(copies['missing'], {'uffizi'}) def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker.choose_backup_servers(present, missing)) @istest def choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) class TestArchiverStorageStub(unittest.TestCase): def setUp(self): self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6' } } self.dest_storage = get_objstorage(**dest_config) self.config = { 'cls': 'stub', 'args': { 'archives': { 'present_archive': 'http://uffizi:5003', 'missing_archive': 'http://banco:5003', }, 'present': ['present_archive'], 'missing': ['missing_archive'], 'logfile_base': os.path.join(self.log_root, 'log_'), } } # Generated with: # # id_length = 20 # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') # self.content_ids = [ b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', ] self.archiver_storage = get_archiver_storage(**self.config) super().setUp() def tearDown(self): shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) shutil.rmtree(self.log_root) super().tearDown() @istest def archive_ls(self): self.assertCountEqual( self.archiver_storage.archive_ls(), self.config['args']['archives'].items() ) @istest def content_archive_get(self): for content_id in self.content_ids: self.assertEqual( self.archiver_storage.content_archive_get(content_id), (content_id, set(self.config['args']['present']), {}), ) @istest def content_archive_get_copies(self): self.assertCountEqual( self.archiver_storage.content_archive_get_copies(), [], ) @istest def content_archive_get_unarchived_copies(self): retention_policy = 2 self.assertCountEqual( self.archiver_storage.content_archive_get_unarchived_copies( retention_policy), [], ) @istest def content_archive_get_missing(self): self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'missing_archive' ), self.content_ids, ) self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'present_archive' ), [], ) with self.assertRaises(ValueError): list(self.archiver_storage.content_archive_get_missing( self.content_ids, 'unknown_archive' )) @istest def content_archive_get_unknown(self): self.assertCountEqual( self.archiver_storage.content_archive_get_unknown( self.content_ids, ), [], ) @istest def content_archive_update(self): for content_id in self.content_ids: self.archiver_storage.content_archive_update( content_id, 'present_archive', 'present') self.archiver_storage.content_archive_update( content_id, 'missing_archive', 'present') self.archiver_storage.close_logfile() # Make sure we created a logfile files = glob.glob('%s*' % self.config['args']['logfile_base']) self.assertEqual(len(files), 1) # make sure the logfile contains all our lines lines = open(files[0]).readlines() self.assertEqual(len(lines), 2 * len(self.content_ids))