diff --git a/MANIFEST.in b/MANIFEST.in index 5dcdb71..a2dfbe5 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,9 +1,8 @@ include Makefile include Makefile.local include README.md include requirements.txt include requirements-swh.txt include version.txt recursive-include sql * -recursive-include swh/archiver/tests/data *.sql recursive-include swh/archiver/sql *.sql diff --git a/swh/archiver/tests/__init__.py b/swh/archiver/tests/__init__.py index e69de29..dc8015f 100644 --- a/swh/archiver/tests/__init__.py +++ b/swh/archiver/tests/__init__.py @@ -0,0 +1,5 @@ +from os import path +import swh.archiver + + +SQL_DIR = path.join(path.dirname(swh.archiver.__file__), 'sql') diff --git a/swh/archiver/tests/data/dumps/swh-archiver.sql b/swh/archiver/tests/data/dumps/swh-archiver.sql deleted file mode 100644 index 75cb8de..0000000 --- a/swh/archiver/tests/data/dumps/swh-archiver.sql +++ /dev/null @@ -1,384 +0,0 @@ --- --- PostgreSQL database dump --- - --- Dumped from database version 10.4 (Debian 10.4-2.pgdg+1) --- Dumped by pg_dump version 10.4 (Debian 10.4-2.pgdg+1) - -SET statement_timeout = 0; -SET lock_timeout = 0; -SET idle_in_transaction_session_timeout = 0; -SET client_encoding = 'UTF8'; -SET standard_conforming_strings = on; -SELECT pg_catalog.set_config('search_path', '', false); -SET check_function_bodies = false; -SET client_min_messages = warning; -SET row_security = off; - --- --- Name: plpgsql; Type: EXTENSION; Schema: -; Owner: - --- - -CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; - - --- --- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: - --- - -COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; - - --- --- Name: archive_status; Type: TYPE; Schema: public; Owner: - --- - -CREATE TYPE public.archive_status AS ENUM ( - 'missing', - 'ongoing', - 'present', - 'corrupted' -); - - --- --- Name: TYPE archive_status; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON TYPE public.archive_status IS 'Status of a given copy of a content'; - - --- --- Name: bucket; Type: DOMAIN; Schema: public; Owner: - --- - -CREATE DOMAIN public.bucket AS bytea - CONSTRAINT bucket_check CHECK ((length(VALUE) = 2)); - - --- --- Name: sha1; Type: DOMAIN; Schema: public; Owner: - --- - -CREATE DOMAIN public.sha1 AS bytea - CONSTRAINT sha1_check CHECK ((length(VALUE) = 20)); - - --- --- Name: swh_content_copies_from_temp(text[]); Type: FUNCTION; Schema: public; Owner: - --- - -CREATE FUNCTION public.swh_content_copies_from_temp(archive_names text[]) RETURNS void - LANGUAGE plpgsql - AS $$ - begin - with existing_content_ids as ( - select id - from content - inner join tmp_content on content.sha1 = tmp.sha1 - ), created_content_ids as ( - insert into content (sha1) - select sha1 from tmp_content - on conflict do nothing - returning id - ), content_ids as ( - select * from existing_content_ids - union all - select * from created_content_ids - ), archive_ids as ( - select id from archive - where name = any(archive_names) - ) insert into content_copies (content_id, archive_id, mtime, status) - select content_ids.id, archive_ids.id, now(), 'present' - from content_ids cross join archive_ids - on conflict (content_id, archive_id) do update - set mtime = excluded.mtime, status = excluded.status; - end -$$; - - --- --- Name: swh_mktemp_content(); Type: FUNCTION; Schema: public; Owner: - --- - -CREATE FUNCTION public.swh_mktemp_content() RETURNS void - LANGUAGE plpgsql - AS $$ - begin - create temporary table tmp_content ( - sha1 sha1 not null - ) on commit drop; - return; - end -$$; - - -SET default_tablespace = ''; - -SET default_with_oids = false; - --- --- Name: archive; Type: TABLE; Schema: public; Owner: - --- - -CREATE TABLE public.archive ( - id bigint NOT NULL, - name text NOT NULL -); - - --- --- Name: TABLE archive; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON TABLE public.archive IS 'The archives in which contents are stored'; - - --- --- Name: COLUMN archive.id; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON COLUMN public.archive.id IS 'Short identifier for archives'; - - --- --- Name: COLUMN archive.name; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON COLUMN public.archive.name IS 'Name of the archive'; - - --- --- Name: archive_id_seq; Type: SEQUENCE; Schema: public; Owner: - --- - -CREATE SEQUENCE public.archive_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - --- --- Name: archive_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - --- - -ALTER SEQUENCE public.archive_id_seq OWNED BY public.archive.id; - - --- --- Name: content; Type: TABLE; Schema: public; Owner: - --- - -CREATE TABLE public.content ( - id bigint NOT NULL, - sha1 public.sha1 NOT NULL -); - - --- --- Name: TABLE content; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON TABLE public.content IS 'All the contents being archived by Software Heritage'; - - --- --- Name: COLUMN content.id; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON COLUMN public.content.id IS 'Short id for the content being archived'; - - --- --- Name: COLUMN content.sha1; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON COLUMN public.content.sha1 IS 'SHA1 hash of the content being archived'; - - --- --- Name: content_copies; Type: TABLE; Schema: public; Owner: - --- - -CREATE TABLE public.content_copies ( - content_id bigint NOT NULL, - archive_id bigint NOT NULL, - mtime timestamp with time zone, - status public.archive_status NOT NULL -); - - --- --- Name: TABLE content_copies; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON TABLE public.content_copies IS 'Tracking of all content copies in the archives'; - - --- --- Name: COLUMN content_copies.mtime; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON COLUMN public.content_copies.mtime IS 'Last update time of the copy'; - - --- --- Name: COLUMN content_copies.status; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON COLUMN public.content_copies.status IS 'Status of the copy'; - - --- --- Name: content_id_seq; Type: SEQUENCE; Schema: public; Owner: - --- - -CREATE SEQUENCE public.content_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - --- --- Name: content_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - --- - -ALTER SEQUENCE public.content_id_seq OWNED BY public.content.id; - - --- --- Name: dbversion; Type: TABLE; Schema: public; Owner: - --- - -CREATE TABLE public.dbversion ( - version integer NOT NULL, - release timestamp with time zone, - description text -); - - --- --- Name: TABLE dbversion; Type: COMMENT; Schema: public; Owner: - --- - -COMMENT ON TABLE public.dbversion IS 'Schema update tracking'; - - --- --- Name: archive id; Type: DEFAULT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.archive ALTER COLUMN id SET DEFAULT nextval('public.archive_id_seq'::regclass); - - --- --- Name: content id; Type: DEFAULT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.content ALTER COLUMN id SET DEFAULT nextval('public.content_id_seq'::regclass); - - --- --- Data for Name: archive; Type: TABLE DATA; Schema: public; Owner: - --- - -COPY public.archive (id, name) FROM stdin; -1 uffizi -2 banco -3 azure -\. - - --- --- Data for Name: content; Type: TABLE DATA; Schema: public; Owner: - --- - -COPY public.content (id, sha1) FROM stdin; -\. - - --- --- Data for Name: content_copies; Type: TABLE DATA; Schema: public; Owner: - --- - -COPY public.content_copies (content_id, archive_id, mtime, status) FROM stdin; -\. - - --- --- Data for Name: dbversion; Type: TABLE DATA; Schema: public; Owner: - --- - -COPY public.dbversion (version, release, description) FROM stdin; -10 2018-06-05 13:57:27.48746+02 Work In Progress -\. - - --- --- Name: archive_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - --- - -SELECT pg_catalog.setval('public.archive_id_seq', 3, true); - - --- --- Name: content_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - --- - -SELECT pg_catalog.setval('public.content_id_seq', 1, false); - - --- --- Name: archive archive_pkey; Type: CONSTRAINT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.archive - ADD CONSTRAINT archive_pkey PRIMARY KEY (id); - - --- --- Name: content_copies content_copies_pkey; Type: CONSTRAINT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.content_copies - ADD CONSTRAINT content_copies_pkey PRIMARY KEY (content_id, archive_id); - - --- --- Name: content content_pkey; Type: CONSTRAINT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.content - ADD CONSTRAINT content_pkey PRIMARY KEY (id); - - --- --- Name: dbversion dbversion_pkey; Type: CONSTRAINT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.dbversion - ADD CONSTRAINT dbversion_pkey PRIMARY KEY (version); - - --- --- Name: archive_name_idx; Type: INDEX; Schema: public; Owner: - --- - -CREATE UNIQUE INDEX archive_name_idx ON public.archive USING btree (name); - - --- --- Name: content_sha1_idx; Type: INDEX; Schema: public; Owner: - --- - -CREATE UNIQUE INDEX content_sha1_idx ON public.content USING btree (sha1); - - --- --- PostgreSQL database dump complete --- - diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py index ea45df8..008b69f 100644 --- a/swh/archiver/tests/test_archiver.py +++ b/swh/archiver/tests/test_archiver.py @@ -1,429 +1,427 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import glob import os import shutil import tempfile import unittest from nose.plugins.attrib import attr from swh.archiver import (ArchiverWithRetentionPolicyDirector, ArchiverWithRetentionPolicyWorker) from swh.archiver.db import utcnow from swh.archiver.storage import get_archiver_storage from swh.core.tests.db_testing import SingleDbTestFixture from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError - -DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') +from swh.archiver.tests import SQL_DIR @attr('db') class TestArchiver(SingleDbTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAME = 'softwareheritage-archiver-test' - TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps', 'swh-archiver.sql') - TEST_DB_DUMP_TYPE = 'psql' + TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') def setUp(self): # Launch the backup server super().setUp() # Create source storage self.src_root = tempfile.mkdtemp() src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) self.dest_root = tempfile.mkdtemp(prefix='remote') dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6', } } self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = { 'uffizi': self.src_storage, 'banco': self.dest_storage } # Override configurations src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self._override_director_config() self._override_worker_config() # Create the base archiver self.archiver = self._create_director() def tearDown(self): self.empty_tables() shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) super().tearDown() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content') self.cursor.execute('DELETE FROM content_copies') self.conn.commit() def _override_director_config(self, retention_policy=2): """ Override the default config of the Archiver director to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'batch_max_size': 5000, 'archival_max_age': 3600, 'retention_policy': retention_policy, 'asynchronous': False, 'max_queue_length': 100000, 'queue_throttling_delay': 120, } def _override_worker_config(self): """ Override the default config of the Archiver worker to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa 'retention_policy': 2, 'archival_max_age': 3600, 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'storages': self.archiver_storages, 'source': 'uffizi', 'sources': ['uffizi'], } def _create_director(self): return ArchiverWithRetentionPolicyDirector(start_id=None) def _create_worker(self, batch={}): return ArchiverWithRetentionPolicyWorker(batch) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. Args: storage_name: the concerned storage content_data: the data to insert with_row_insert: to insert a row entry in the db or not """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) self.cursor.execute(""" INSERT INTO content (sha1) VALUES (%s) """, (obj_id,)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ self.cursor.execute("""insert into archive (name) values (%s) on conflict do nothing""", (storage_name,)) self.archiver.archiver_storage.content_archive_update( obj_id, storage_name, status ) # Integration test def test_archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) def test_archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) def test_archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self._override_director_config(retention_policy=1) director = self._create_director() # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) def test_content_archive_get_copies(self): self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [], ) obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [(obj_id, ['uffizi'], {})], ) # Unit tests for archive worker def archival_elapsed(self, mtime): return self._create_worker()._is_archival_delay_elapsed(mtime) def test_vstatus_ongoing_remaining(self): self.assertFalse(self.archival_elapsed(utcnow())) def test_vstatus_ongoing_elapsed(self): past_time = (utcnow() - datetime.timedelta( seconds=self._create_worker().archival_max_age)) self.assertTrue(self.archival_elapsed(past_time)) def test_need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), True) def test_need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker.compute_copies( set(worker.objstorages), obj_id)[status]) def test_compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') def test_compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') def test_compute_copies_extra_archive(self): obj_id = self._add_content('banco', b'foobar') self._update_status(obj_id, 'banco', 'present') self._update_status(obj_id, 'random_archive', 'present') worker = self._create_worker() copies = worker.compute_copies(set(worker.objstorages), obj_id) self.assertEqual(copies['present'], {'banco'}) self.assertEqual(copies['missing'], {'uffizi'}) def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker.choose_backup_servers(present, missing)) def test_choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) class TestArchiverStorageStub(unittest.TestCase): def setUp(self): self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6' } } self.dest_storage = get_objstorage(**dest_config) self.config = { 'cls': 'stub', 'args': { 'archives': { 'present_archive': 'http://uffizi:5003', 'missing_archive': 'http://banco:5003', }, 'present': ['present_archive'], 'missing': ['missing_archive'], 'logfile_base': os.path.join(self.log_root, 'log_'), } } # Generated with: # # id_length = 20 # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') # self.content_ids = [ b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', ] self.archiver_storage = get_archiver_storage(**self.config) super().setUp() def tearDown(self): shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) shutil.rmtree(self.log_root) super().tearDown() def test_archive_ls(self): self.assertCountEqual( self.archiver_storage.archive_ls(), self.config['args']['archives'].items() ) def test_content_archive_get(self): for content_id in self.content_ids: self.assertEqual( self.archiver_storage.content_archive_get(content_id), (content_id, set(self.config['args']['present']), {}), ) def test_content_archive_get_copies(self): self.assertCountEqual( self.archiver_storage.content_archive_get_copies(), [], ) def test_content_archive_get_unarchived_copies(self): retention_policy = 2 self.assertCountEqual( self.archiver_storage.content_archive_get_unarchived_copies( retention_policy), [], ) def test_content_archive_get_missing(self): self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'missing_archive' ), self.content_ids, ) self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'present_archive' ), [], ) with self.assertRaises(ValueError): list(self.archiver_storage.content_archive_get_missing( self.content_ids, 'unknown_archive' )) def test_content_archive_get_unknown(self): self.assertCountEqual( self.archiver_storage.content_archive_get_unknown( self.content_ids, ), [], ) def test_content_archive_update(self): for content_id in self.content_ids: self.archiver_storage.content_archive_update( content_id, 'present_archive', 'present') self.archiver_storage.content_archive_update( content_id, 'missing_archive', 'present') self.archiver_storage.close_logfile() # Make sure we created a logfile files = glob.glob('%s*' % self.config['args']['logfile_base']) self.assertEqual(len(files), 1) # make sure the logfile contains all our lines lines = open(files[0]).readlines() self.assertEqual(len(lines), 2 * len(self.content_ids))