diff --git a/.gitignore b/.gitignore deleted file mode 100644 index c5baade..0000000 --- a/.gitignore +++ /dev/null @@ -1,9 +0,0 @@ -*.pyc -*.sw? -*~ -.coverage -.eggs/ -__pycache__ -dist -*.egg-info -version.txt diff --git a/MANIFEST.in b/MANIFEST.in index 2a92ebe..a2dfbe5 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,8 +1,8 @@ include Makefile include Makefile.local -include README.db_testing -include README.dev +include README.md include requirements.txt include requirements-swh.txt include version.txt recursive-include sql * +recursive-include swh/archiver/sql *.sql diff --git a/PKG-INFO b/PKG-INFO index 456ab87..b42171b 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,71 @@ -Metadata-Version: 1.0 +Metadata-Version: 2.1 Name: swh.archiver -Version: 0.0.4 +Version: 0.0.6 Summary: Software Heritage archiver Home-page: https://forge.softwareheritage.org/diffusion/DARC/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Description: UNKNOWN +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-archiver +Description: swh-archiver + ============ + + Orchestrator in charge of guaranteeing that object storage content is pristine + and available in a sufficient amount of copies. + + Tests + ----- + + This module's tests need a postgres db to run. It is recommended to run + those but they can be skipped: + + - `make test`: will run all tests + - `make test-nodb`: will run only tests that do not need a local DB + - `make test-db`: will run only tests that do need a local DB + + If you do want to run DB-related tests, you should ensure you have access with + sufficient privileges to a Postgresql database. + + ### Using your system database + + You need to: + + - ensure that your user is authorized to create and drop DBs, and in particular + DBs named "softwareheritage-test" and "softwareheritage-dev" + + - ensure that you have the storage testdata repository checked out in + ../swh-storage-testdata + + ### Using pifpaf + + [pifpaf](https://github.com/jd/pifpaf) is a suite of fixtures and a + command-line tool that allows to start and stop daemons for a quick throw-away + usage. + + It can be used to run tests that need a Postgres database without any other + configuration reauired nor the need to have special access to a running + database: + + ```bash + + $ pifpaf run postgresql make test-db + [snip] + ---------------------------------------------------------------------- + Ran 12 tests in 0.903s + + OK + ``` + + Note that pifpaf is not yet available as a Debian package, so you may have to + install it in a venv. + Platform: UNKNOWN +Classifier: Programming Language :: Python :: 3 +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) +Classifier: Operating System :: OS Independent +Classifier: Development Status :: 5 - Production/Stable +Description-Content-Type: text/markdown +Provides-Extra: testing diff --git a/README.db_testing b/README.db_testing deleted file mode 100644 index 8617980..0000000 --- a/README.db_testing +++ /dev/null @@ -1,17 +0,0 @@ -Running database-related tests -============================== - -Python tests for this module include tests that cannot be run without a local -Postgres database. You are not obliged to run those tests though: - -- `make test`: will run all tests -- `make test-nodb`: will run only tests that do not need a local DB -- `make test-db`: will run only tests that do need a local DB - -If you do want to run DB-related tests, you should: - -- ensure that your user is authorized to create and drop DBs, and in particular - DBs named "softwareheritage-test" and "softwareheritage-dev" - -- ensure that you have the storage testdata repository checked out in - ../swh-storage-testdata diff --git a/README.md b/README.md new file mode 100644 index 0000000..05a0164 --- /dev/null +++ b/README.md @@ -0,0 +1,51 @@ +swh-archiver +============ + +Orchestrator in charge of guaranteeing that object storage content is pristine +and available in a sufficient amount of copies. + +Tests +----- + +This module's tests need a postgres db to run. It is recommended to run +those but they can be skipped: + +- `make test`: will run all tests +- `make test-nodb`: will run only tests that do not need a local DB +- `make test-db`: will run only tests that do need a local DB + +If you do want to run DB-related tests, you should ensure you have access with +sufficient privileges to a Postgresql database. + +### Using your system database + +You need to: + +- ensure that your user is authorized to create and drop DBs, and in particular + DBs named "softwareheritage-test" and "softwareheritage-dev" + +- ensure that you have the storage testdata repository checked out in + ../swh-storage-testdata + +### Using pifpaf + +[pifpaf](https://github.com/jd/pifpaf) is a suite of fixtures and a +command-line tool that allows to start and stop daemons for a quick throw-away +usage. + +It can be used to run tests that need a Postgres database without any other +configuration reauired nor the need to have special access to a running +database: + +```bash + +$ pifpaf run postgresql make test-db +[snip] +---------------------------------------------------------------------- +Ran 12 tests in 0.903s + +OK +``` + +Note that pifpaf is not yet available as a Debian package, so you may have to +install it in a venv. diff --git a/docs/.gitignore b/docs/.gitignore deleted file mode 100644 index 58a761e..0000000 --- a/docs/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -_build/ -apidoc/ -*-stamp diff --git a/docs/Makefile b/docs/Makefile deleted file mode 100644 index c30c50a..0000000 --- a/docs/Makefile +++ /dev/null @@ -1 +0,0 @@ -include ../../swh-docs/Makefile.sphinx diff --git a/docs/_static/.placeholder b/docs/_static/.placeholder deleted file mode 100644 index e69de29..0000000 diff --git a/docs/_templates/.placeholder b/docs/_templates/.placeholder deleted file mode 100644 index e69de29..0000000 diff --git a/docs/archiver-blueprint.md b/docs/archiver-blueprint.md deleted file mode 100644 index 366ac7a..0000000 --- a/docs/archiver-blueprint.md +++ /dev/null @@ -1,230 +0,0 @@ -Software Heritage Archiver -========================== - -The Software Heritage (SWH) Archiver is responsible for backing up SWH objects -as to reduce the risk of losing them. - -Currently, the archiver only deals with content objects (i.e., those referenced -by the content table in the DB and stored in the SWH object storage). The -database itself is lively replicated by other means. - - -Requirements ------------- - -* **Peer-to-peer topology** - - Every storage involved in the archival process can be used as a source or a - destination for the archival, depending on the blobs it contains. A - retention policy specifies the minimum number of copies that are required - to be "safe". - - Although the servers are totally equals the coordination of which content - should be copied and from where to where is centralized. - -* **Append-only archival** - - The archiver treats involved storages as append-only storages. The archiver - never deletes any object. If removals are needed, they will be dealt with - by other means. - -* **Asynchronous archival.** - - Periodically (e.g., via cron), the archiver runs, produces a list of objects - that need to have more copies, and starts copying them over. The decision of - which storages are choosen to be sources and destinations are not up to the - storages themselves. - - Very likely, during any given archival run, other new objects will be added - to storages; it will be the responsibility of *future* archiver runs, and - not the current one, to copy new objects over if needed. - -* **Integrity at archival time.** - - Before archiving objects, the archiver performs suitable integrity checks on - them. For instance, content objects are verified to ensure that they can be - decompressed and that their content match their (checksum-based) unique - identifiers. Corrupt objects will not be archived and suitable errors - reporting about the corruption will be emitted. - - Note that archival-time integrity checks are *not meant to replace periodic - integrity checks*. - -* **Parallel archival** - - Once the list of objects to be archived has been identified, it SHOULD be - possible to archive objects in parallel w.r.t. one another. - -* **Persistent archival status** - - The archiver maintains a mapping between objects and their storage - locations. Locations are the set {master, slave_1, ..., slave_n}. - - Each pair is also associated to the following - information: - - * **status**: 4-state: *missing* (copy not present at destination), *ongoing* - (copy to destination ongoing), *present* (copy present at destination), - *corrupted* (content detected as corrupted during an archival). - * **mtime**: timestamp of last status change. This is either the destination - archival time (when status=present), or the timestamp of the last archival - request (status=ongoing); the timestamp is undefined when status=missing. - - -Architecture ------------- - -The archiver is comprised of the following software components: - -* archiver director -* archiver worker -* archiver copier - - -### Archiver director - -The archiver director is run periodically, e.g., via cron. - -Runtime parameters: - -* execution periodicity (external) -* retention policy -* archival max age -* archival batch size - -At each execution the director: - -1. for each object: retrieve its archival status -2. for each object that has fewer copies than those requested by the - retention policy: - 1. mark object as needing archival -3. group objects in need of archival in batches of `archival batch size` -4. for each batch: - 1. spawn an archive worker on the whole batch (e.g., submitting the relevant - celery task) - - -### Archiver worker - -The archiver is executed on demand (e.g., by a celery worker) to archive a -given set of objects. - -Runtime parameters: - -* objects to archive -* archival policies (retention & archival max age) - -At each execution a worker: - -1. for each object in the batch - 1. check that the object still need to be archived - (#present copies < retention policy) - 2. if an object has status=ongoing but the elapsed time from task - submission is less than the *archival max age*, it counts as - present (as we assume that it will be copied in the near - future). If the delay is elapsed (still with status ongoing), it - counts as a missing copy. -2. for each object to archive: - 1. retrieve current archive status for all destinations - 2. create a map noting where the object is present and where it can be copied - 3. Randomly choose couples (source, destination), where destinations are all - differents, to make enough copies -3. for each (content, source, destination): - 1. Join the contents by key (source, destination) to have a map - {(source, destination) -> [contents]} -4. for each (source, destination) -> contents - 1. for each content in contents, check its integrity on the source storage - * if the object is corrupted or missing - * update its status in the database - * remove it from the current contents list -5. start the copy of the batches by launching for each transfer tuple a copier - * if an error occurred on one of the content that should have been valid, - consider the whole batch as a failure. -6. set status=present and mtime=now for each successfully copied object - -Note that: - -* In case multiple jobs where tasked to archive the same overlapping - objects, step (1) might decide that some/all objects of this batch - no longer needs to be archived. - -* Due to parallelism, it is possible that the same objects will be - copied over at the same time by multiple workers. Also, the same - object could end up having more copies than the minimal number - required. - - -### Archiver copier - -The copier is run on demand by archiver workers, to transfer file batches from -a given source to a given destination. - -The copier transfers files one by one. The copying process is atomic with a file -granularity (i.e., individual files might be visible on the destination before -*all* files have been transferred) and ensures that *concurrent transfer of the -same files by multiple copier instances do not result in corrupted files*. Note -that, due to this and the fact that timestamps are updated by the worker, all -files copied in the same batch will have the same mtime even though the actual -file creation times on a given destination might differ. - -The copier is implemented using the ObjStorage API for the sources and -destinations. - - -DB structure ------------- - -Postgres SQL definitions for the archival status: - - -- Those names are sample of archives server names - CREATE TYPE archive_id AS ENUM ( - 'uffizi', - 'banco' - ); - - CREATE TABLE archive ( - id archive_id PRIMARY KEY, - url TEXT - ); - - CREATE TYPE archive_status AS ENUM ( - 'missing', - 'ongoing', - 'present', - 'corrupted' - ); - - CREATE TABLE content_archive ( - content_id sha1 unique, - copies jsonb - ); - - -Where the content_archive.copies field is of type jsonb. -It contains content's presence (or absence) in storages. -A content being represented by its signature (sha1) - - { - "$schema": "http://json-schema.org/schema#", - "title": "Copies data", - "description": "Data about the presence of a content into the storages", - "type": "object", - "Patternproperties": { - "^[a-zA-Z1-9]+$": { - "description": "archival status for the server named by key", - "type": "object", - "properties": { - "status": { - "description": "Archival status on this copy", - "type": "string", - "enum": ["missing", "ongoing", "present", "corrupted"] - }, - "mtime": { - "description": "Last time of the status update", - "type": "float" - } - } - } - }, - "additionalProperties": false - } diff --git a/docs/conf.py b/docs/conf.py deleted file mode 100644 index 190deb7..0000000 --- a/docs/conf.py +++ /dev/null @@ -1 +0,0 @@ -from swh.docs.sphinx.conf import * # NoQA diff --git a/docs/index.rst b/docs/index.rst deleted file mode 100644 index 27f5748..0000000 --- a/docs/index.rst +++ /dev/null @@ -1,17 +0,0 @@ -.. _swh-archiver: - -Software Heritage - Development Documentation -============================================= - -.. toctree:: - :maxdepth: 2 - :caption: Contents: - - - -Indices and tables -================== - -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` diff --git a/requirements-swh.txt b/requirements-swh.txt index 22b33fb..73822c8 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,5 @@ -swh.core >= 0.0.37 -swh.journal >= 0.0.2 -swh.model >= 0.0.15 +swh.core >= 0.0.54 +swh.journal >= 0.0.5 +swh.model >= 0.0.27 swh.objstorage >= 0.0.17 -swh.scheduler >= 0.0.14 -swh.storage >= 0.0.102 +swh.scheduler >= 0.0.39 diff --git a/setup.py b/setup.py index dccd330..01c9ddc 100755 --- a/setup.py +++ b/setup.py @@ -1,29 +1,65 @@ #!/usr/bin/env python3 +# Copyright (C) 2015-2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information +import os from setuptools import setup, find_packages +from os import path +from io import open + +here = path.abspath(path.dirname(__file__)) + +# Get the long description from the README file +with open(path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = f.read() + + +def parse_requirements(name=None): + if name: + reqf = 'requirements-%s.txt' % name + else: + reqf = 'requirements.txt' -def parse_requirements(): requirements = [] - for reqf in ('requirements.txt', 'requirements-swh.txt'): - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith('#'): - continue - requirements.append(line) + if not os.path.exists(reqf): + return requirements + + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith('#'): + continue + requirements.append(line) return requirements setup( name='swh.archiver', description='Software Heritage archiver', + long_description=long_description, + long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DARC/', packages=find_packages(), - install_requires=parse_requirements(), + install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], + extras_require={'testing': parse_requirements('test')}, vcversioner={}, include_package_data=True, + classifiers=[ + "Programming Language :: Python :: 3", + "Intended Audience :: Developers", + "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", + "Operating System :: OS Independent", + "Development Status :: 5 - Production/Stable", + ], + project_urls={ + 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', + 'Funding': 'https://www.softwareheritage.org/donate', + 'Source': 'https://forge.softwareheritage.org/source/swh-archiver', + }, ) diff --git a/sql/Makefile b/sql/Makefile deleted file mode 100644 index ab596ae..0000000 --- a/sql/Makefile +++ /dev/null @@ -1,42 +0,0 @@ -# Depends: postgresql-client, postgresql-autodoc - -DBNAME = softwareheritage-archiver-dev -DOCDIR = autodoc - -SQL_INIT = swh-init.sql -SQL_SCHEMA = swh-archiver-schema.sql -SQL_FUNC = swh-archiver-func.sql -SQL_DATA = swh-archiver-data.sql -SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA) - -PSQL_BIN = psql -PSQL_FLAGS = --single-transaction --echo-all -X -PSQL = $(PSQL_BIN) $(PSQL_FLAGS) - - -all: - -createdb: createdb-stamp -createdb-stamp: $(SQL_INIT) - createdb $(DBNAME) - touch $@ - -filldb: filldb-stamp -filldb-stamp: createdb-stamp - cat $(SQLS) | $(PSQL) $(DBNAME) - touch $@ - -dropdb: - -dropdb $(DBNAME) - -dumpdb: swh-archiver.dump -swh.dump: filldb-stamp - pg_dump -Fc $(DBNAME) > $@ - -clean: - rm -rf *-stamp $(DOCDIR)/ - -distclean: clean dropdb - rm -f swh.dump - -.PHONY: all initdb createdb dropdb doc clean diff --git a/sql/createdb-stamp b/sql/createdb-stamp deleted file mode 100644 index e69de29..0000000 diff --git a/sql/filldb-stamp b/sql/filldb-stamp deleted file mode 100644 index e69de29..0000000 diff --git a/swh.archiver.egg-info/PKG-INFO b/swh.archiver.egg-info/PKG-INFO index 456ab87..b42171b 100644 --- a/swh.archiver.egg-info/PKG-INFO +++ b/swh.archiver.egg-info/PKG-INFO @@ -1,10 +1,71 @@ -Metadata-Version: 1.0 +Metadata-Version: 2.1 Name: swh.archiver -Version: 0.0.4 +Version: 0.0.6 Summary: Software Heritage archiver Home-page: https://forge.softwareheritage.org/diffusion/DARC/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Description: UNKNOWN +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-archiver +Description: swh-archiver + ============ + + Orchestrator in charge of guaranteeing that object storage content is pristine + and available in a sufficient amount of copies. + + Tests + ----- + + This module's tests need a postgres db to run. It is recommended to run + those but they can be skipped: + + - `make test`: will run all tests + - `make test-nodb`: will run only tests that do not need a local DB + - `make test-db`: will run only tests that do need a local DB + + If you do want to run DB-related tests, you should ensure you have access with + sufficient privileges to a Postgresql database. + + ### Using your system database + + You need to: + + - ensure that your user is authorized to create and drop DBs, and in particular + DBs named "softwareheritage-test" and "softwareheritage-dev" + + - ensure that you have the storage testdata repository checked out in + ../swh-storage-testdata + + ### Using pifpaf + + [pifpaf](https://github.com/jd/pifpaf) is a suite of fixtures and a + command-line tool that allows to start and stop daemons for a quick throw-away + usage. + + It can be used to run tests that need a Postgres database without any other + configuration reauired nor the need to have special access to a running + database: + + ```bash + + $ pifpaf run postgresql make test-db + [snip] + ---------------------------------------------------------------------- + Ran 12 tests in 0.903s + + OK + ``` + + Note that pifpaf is not yet available as a Debian package, so you may have to + install it in a venv. + Platform: UNKNOWN +Classifier: Programming Language :: Python :: 3 +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) +Classifier: Operating System :: OS Independent +Classifier: Development Status :: 5 - Production/Stable +Description-Content-Type: text/markdown +Provides-Extra: testing diff --git a/swh.archiver.egg-info/SOURCES.txt b/swh.archiver.egg-info/SOURCES.txt index a8ca13b..8742897 100644 --- a/swh.archiver.egg-info/SOURCES.txt +++ b/swh.archiver.egg-info/SOURCES.txt @@ -1,57 +1,42 @@ -.gitignore MANIFEST.in Makefile Makefile.local -README.db_testing +README.md requirements-swh.txt requirements.txt setup.py version.txt -debian/changelog -debian/compat -debian/control -debian/copyright -debian/rules -debian/source/format -docs/.gitignore -docs/Makefile -docs/archiver-blueprint.md -docs/conf.py -docs/index.rst -docs/_static/.placeholder -docs/_templates/.placeholder sql/.gitignore -sql/Makefile -sql/createdb-stamp -sql/filldb-stamp -sql/swh-archiver-data.sql -sql/swh-archiver-func.sql -sql/swh-archiver-schema.sql -sql/swh-init.sql sql/bin/db-upgrade sql/upgrades/002.sql sql/upgrades/003.sql sql/upgrades/004.sql sql/upgrades/005.sql sql/upgrades/006.sql sql/upgrades/007.sql sql/upgrades/008.sql sql/upgrades/009.sql swh/__init__.py swh.archiver.egg-info/PKG-INFO swh.archiver.egg-info/SOURCES.txt swh.archiver.egg-info/dependency_links.txt swh.archiver.egg-info/requires.txt swh.archiver.egg-info/top_level.txt swh/archiver/__init__.py swh/archiver/checker.py swh/archiver/copier.py swh/archiver/db.py swh/archiver/director.py swh/archiver/storage.py swh/archiver/tasks.py swh/archiver/updater.py swh/archiver/worker.py +swh/archiver/sql/10-swh-init.sql +swh/archiver/sql/30-swh-schema.sql +swh/archiver/sql/40-swh-func.sql +swh/archiver/sql/50-swh-data.sql swh/archiver/tests/__init__.py +swh/archiver/tests/conftest.py swh/archiver/tests/test_archiver.py +swh/archiver/tests/test_archiver_storage.py swh/archiver/tests/test_checker.py \ No newline at end of file diff --git a/swh.archiver.egg-info/requires.txt b/swh.archiver.egg-info/requires.txt index 1fc6c64..4c9841d 100644 --- a/swh.archiver.egg-info/requires.txt +++ b/swh.archiver.egg-info/requires.txt @@ -1,10 +1,13 @@ click kafka-python psycopg2 -swh.core>=0.0.37 -swh.journal>=0.0.2 -swh.model>=0.0.15 -swh.objstorage>=0.0.17 -swh.scheduler>=0.0.14 -swh.storage>=0.0.102 vcversioner +swh.core>=0.0.54 +swh.journal>=0.0.5 +swh.model>=0.0.27 +swh.objstorage>=0.0.17 +swh.scheduler>=0.0.39 + +[testing] +pytest<4 +pytest-postgresql diff --git a/swh/archiver/checker.py b/swh/archiver/checker.py index 5131e2f..a3d39f3 100644 --- a/swh/archiver/checker.py +++ b/swh/archiver/checker.py @@ -1,271 +1,256 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import logging from swh.core import config from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError, Error from .storage import ArchiverStorage class BaseContentChecker(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract class of the content integrity checker. This checker's purpose is to iterate over the contents of a storage and check the integrity of each file. Behavior of the checker to deal with corrupted status will be specified by subclasses. You should override the DEFAULT_CONFIG and CONFIG_BASE_FILENAME variables if you need it. """ DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6' } }), 'batch_size': ('int', 1000), + 'log_tag': ('str', 'objstorage.checker') } CONFIG_BASE_FILENAME = 'objstorage/objstorage_checker' def __init__(self): """ Create a checker that ensure the objstorage have no corrupted file """ self.config = self.parse_config_file() self.objstorage = get_objstorage(**self.config['storage']) self.batch_size = self.config['batch_size'] + self.logger = logging.getLogger(self.config['log_tag']) def run_as_daemon(self): """ Start the check routine and perform it forever. Use this method to run the checker as a daemon that will iterate over the content forever in background. """ while True: try: self.run() - except: - pass + except Exception: + self.logger.exception('Unexpected error.') def run(self): """ Check a batch of content. """ for obj_id in self._get_content_to_check(self.batch_size): cstatus = self._check_content(obj_id) if cstatus == 'corrupted': self.corrupted_content(obj_id) elif cstatus == 'missing': self.missing_content(obj_id) def _get_content_to_check(self, batch_size): """ Get the content that should be verified. Returns: An iterable of the content's id that need to be checked. """ yield from self.objstorage.get_random(batch_size) def _check_content(self, obj_id): """ Check the validity of the given content. Returns: True if the content was valid, false if it was corrupted. """ try: self.objstorage.check(obj_id) except ObjNotFoundError: return 'missing' except Error: return 'corrupted' @abc.abstractmethod def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ raise NotImplementedError("%s must implement " "'corrupted_content' method" % type(self)) @abc.abstractmethod def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ raise NotImplementedError("%s must implement " "'missing_content' method" % type(self)) class LogContentChecker(BaseContentChecker): """ Content integrity checker that just log detected errors. """ - - DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'pathslicing', - 'args': { - 'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6' - } - }), - 'batch_size': ('int', 1000), - 'log_tag': ('str', 'objstorage.checker') - } - CONFIG_BASE_FILENAME = 'objstorage/log_checker' - def __init__(self): - super().__init__() - self.logger = logging.getLogger(self.config['log_tag']) - def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ self.logger.error('Content %s is corrupted' % obj_id) def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ self.logger.error('Content %s is detected missing' % obj_id) class RepairContentChecker(LogContentChecker): """ Content integrity checker that will try to restore contents. """ DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6' } }), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'backup_storages': ('dict', { 'banco': { 'cls': 'remote', 'args': {'url': 'http://banco:5003/'} } }) } CONFIG_BASE_FILENAME = 'objstorage/repair_checker' def __init__(self): super().__init__() self.backups = [ get_objstorage(**storage) for name, storage in self.config['backup_storages'].items() ] def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ super().corrupted_content(obj_id) self._restore(obj_id) def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ super().missing_content(obj_id) self._restore(obj_id) def _restore(self, obj_id): if not self._perform_restore(obj_id): # Object could not be restored self.logger.critical( 'Object %s is corrupted and could not be repaired' % obj_id ) def _perform_restore(self, obj_id): """ Try to restore the object in the current storage using the backups """ for backup in self.backups: try: content = backup.get(obj_id) self.objstorage.restore(content, obj_id) - except ObjNotFoundError as e: + except ObjNotFoundError: continue else: # Return True direclty when a backup contains the object return True # No backup contains the object return False class ArchiveNotifierContentChecker(LogContentChecker): """ Implementation of the checker that will update the archiver database Once the database is updated the archiver may restore the content on it's next scheduling as it won't be present anymore, and this status change will probably make the retention policy invalid. """ DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6' } }), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'storage_name': ('str', 'banco'), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev') } CONFIG_BASE_FILENAME = 'objstorage/archive_notifier_checker' def __init__(self): super().__init__() self.archiver_db = ArchiverStorage(self.config['dbconn']) self.storage_name = self.config['storage_name'] def corrupted_content(self, obj_id): """ Perform an action to treat with a corrupted content. """ super().corrupted_content(obj_id) self._update_status(obj_id, 'corrupted') def missing_content(self, obj_id): """ Perform an action to treat with a missing content. """ super().missing_content(obj_id) self._update_status(obj_id, 'missing') def _update_status(self, obj_id, status): self.archiver_db.content_archive_update(obj_id, self.storage_name, new_status=status) @click.command() @click.argument('checker-type', required=1, default='log') @click.option('--daemon/--nodaemon', default=True, help='Indicates if the checker should run forever ' 'or on a single batch of content') def launch(checker_type, daemon): types = { 'log': LogContentChecker, 'repair': RepairContentChecker, 'archiver_notifier': ArchiveNotifierContentChecker } checker = types[checker_type]() if daemon: checker.run_as_daemon() else: checker.run() if __name__ == '__main__': launch() diff --git a/swh/archiver/db.py b/swh/archiver/db.py index e07bf89..a96779e 100644 --- a/swh/archiver/db.py +++ b/swh/archiver/db.py @@ -1,224 +1,228 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure +from swh.core.db import BaseDb +from swh.core.db.db_utils import stored_procedure def utcnow(): return datetime.datetime.now(tz=datetime.timezone.utc) class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") - yield from cursor_to_bytes(cur) + yield from cur def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """select archive.name, status, mtime from content_copies left join archive on content_copies.archive_id = archive.id where content_copies.content_id = ( select id from content where sha1 = %s) """ cur = self._cursor(cur) cur.execute(query, (content_id,)) rows = cur.fetchall() if not rows: return None present = [] ongoing = {} for archive, status, mtime in rows: if status == 'present': present.append(archive) elif status == 'ongoing': ongoing[archive] = mtime return (content_id, present, ongoing) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ vars = { 'limit': limit, } if last_content is None: last_content_clause = 'true' else: last_content_clause = """content_id > ( select id from content where sha1 = %(last_content)s)""" vars['last_content'] = last_content query = """select (select sha1 from content where id = content_id), array_agg((select name from archive where id = archive_id)) from content_copies where status = 'present' and %s group by content_id order by content_id limit %%(limit)s""" % last_content_clause cur = self._cursor(cur) cur.execute(query, vars) - for content_id, present in cursor_to_bytes(cur): + for content_id, present in cur: yield (content_id, present, {}) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ vars = { 'limit': limit, 'retention_policy': retention_policy, } if last_content is None: last_content_clause = 'true' else: last_content_clause = """content_id > ( select id from content where sha1 = %(last_content)s)""" vars['last_content'] = last_content query = """select (select sha1 from content where id = content_id), array_agg((select name from archive where id = archive_id)) from content_copies where status = 'present' and %s group by content_id having count(archive_id) < %%(retention_policy)s order by content_id limit %%(limit)s""" % last_content_clause cur = self._cursor(cur) cur.execute(query, vars) - for content_id, present in cursor_to_bytes(cur): + for content_id, present in cur: yield (content_id, present, {}) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. """ pass @stored_procedure('swh_content_archive_add') def content_archive_add_from_temp(self, cur=None): """Add new content archive entries from temporary table. - Use from archiver.storage module:: - db.mktemp_content_archive(cur) - # copy data over to the temp table - db.copy_to([{'colname': id0}, {'colname': id1}], - 'tmp_cache_content', - ['colname'], cur) - # insert into the main table - db.add_content_archive_from_temp(cur) + Use from archiver.storage module: + + .. code-block:: python + + db.mktemp_content_archive(cur) + # copy data over to the temp table + db.copy_to([{'colname': id0}, {'colname': id1}], + 'tmp_cache_content', + ['colname'], cur) + # insert into the main table + db.add_content_archive_from_temp(cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) - yield from cursor_to_bytes(cur) + yield from cur def content_archive_get_unknown(self, cur=None): """Retrieve unknown sha1 from archiver db. """ cur = self._cursor(cur) cur.execute('select * from swh_content_archive_unknown()') - yield from cursor_to_bytes(cur) + yield from cur def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ assert isinstance(content_id, bytes) assert new_status is not None query = """insert into content_copies (archive_id, content_id, status, mtime) values ((select id from archive where name=%s), (select id from content where sha1=%s), %s, %s) on conflict (archive_id, content_id) do update set status = excluded.status, mtime = excluded.mtime """ cur = self._cursor(cur) cur.execute(query, (archive_id, content_id, new_status, utcnow())) diff --git a/sql/swh-init.sql b/swh/archiver/sql/10-swh-init.sql similarity index 100% rename from sql/swh-init.sql rename to swh/archiver/sql/10-swh-init.sql diff --git a/sql/swh-archiver-schema.sql b/swh/archiver/sql/30-swh-schema.sql similarity index 100% rename from sql/swh-archiver-schema.sql rename to swh/archiver/sql/30-swh-schema.sql diff --git a/sql/swh-archiver-func.sql b/swh/archiver/sql/40-swh-func.sql similarity index 100% rename from sql/swh-archiver-func.sql rename to swh/archiver/sql/40-swh-func.sql diff --git a/sql/swh-archiver-data.sql b/swh/archiver/sql/50-swh-data.sql similarity index 100% rename from sql/swh-archiver-data.sql rename to swh/archiver/sql/50-swh-data.sql diff --git a/swh/archiver/storage.py b/swh/archiver/storage.py index f7140ce..ded5103 100644 --- a/swh/archiver/storage.py +++ b/swh/archiver/storage.py @@ -1,358 +1,354 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import psycopg2 import time from .db import ArchiverDb from swh.model import hashutil -from swh.storage.common import db_transaction_generator, db_transaction -from swh.storage.exc import StorageDBError +from swh.core.db.common import db_transaction_generator, db_transaction class ArchiverStorage(): """SWH Archiver storage proxy, encompassing DB """ def __init__(self, dbconn): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ - try: - if isinstance(dbconn, psycopg2.extensions.connection): - self._db = ArchiverDb(dbconn) - else: - self._db = ArchiverDb.connect(dbconn) - except psycopg2.OperationalError as e: - raise StorageDBError(e) + if isinstance(dbconn, psycopg2.extensions.connection): + self._db = ArchiverDb(dbconn) + else: + self._db = ArchiverDb.connect(dbconn) def get_db(self): return self._db @db_transaction_generator() def archive_ls(self, db=None, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from db.archive_ls(cur) @db_transaction() def content_archive_get(self, content_id, db=None, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return db.content_archive_get(content_id, cur) @db_transaction_generator() def content_archive_get_copies(self, last_content=None, limit=1000, db=None, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from db.content_archive_get_copies(last_content, limit, cur) @db_transaction_generator() def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, db=None, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from db.content_archive_get_unarchived_copies( retention_policy, last_content, limit, cur) @db_transaction_generator() def content_archive_get_missing(self, content_ids, backend_name, db=None, cur=None): """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: missing sha1s from backend_name """ db.mktemp_content_archive(cur) db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_missing(backend_name, cur): yield content_id[0] @db_transaction_generator() def content_archive_get_unknown(self, content_ids, db=None, cur=None): """Retrieve unknown sha1s from content_archive. Args: content_ids ([sha1s]): list of sha1s to test Yields: Unknown sha1s from content_archive """ db.mktemp_content_archive(cur) db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_unknown(cur): yield content_id[0] @db_transaction() def content_archive_update(self, content_id, archive_id, new_status=None, db=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ db.content_archive_update(content_id, archive_id, new_status, cur) @db_transaction() def content_archive_add( self, content_ids, sources_present, db=None, cur=None): """Insert a new entry in db about content_id. Args: content_ids ([bytes|str]): content identifiers sources_present ([str]): List of source names where contents are present """ # Prepare copies dictionary copies = {} for source in sources_present: copies[source] = { "status": "present", "mtime": int(time.time()), } copies = json.dumps(copies) num_present = len(sources_present) db.mktemp('content_archive', cur) db.copy_to( ({'content_id': id, 'copies': copies, 'num_present': num_present} for id in content_ids), 'tmp_content_archive', ['content_id', 'copies', 'num_present'], cur) db.content_archive_add_from_temp(cur) class StubArchiverStorage(): def __init__(self, archives, present, missing, logfile_base): """ A stub storage for the archiver that doesn't write to disk Args: - archives: a dictionary mapping archive names to archive URLs - present: archives where the objects are all considered present - missing: archives where the objects are all considered missing - logfile_base: basename for the logfile """ self.archives = archives self.present = set(present) self.missing = set(missing) if set(archives) != self.present | self.missing: raise ValueError("Present and missing archives don't match") self.logfile_base = logfile_base self.__logfile = None def open_logfile(self): if self.__logfile: return logfile_name = "%s.%d" % (self.logfile_base, os.getpid()) self.__logfile = open(logfile_name, 'a') def close_logfile(self): if not self.__logfile: return self.__logfile.close() self.__logfile = None def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from self.archives.items() def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return (content_id, self.present, {}) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from [] def content_archive_get_unarchived_copies(self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from [] def content_archive_get_missing(self, content_ids, backend_name, cur=None): """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: missing sha1s from backend_name """ if backend_name in self.missing: yield from content_ids elif backend_name in self.present: yield from [] else: raise ValueError('Unknown backend `%s`' % backend_name) def content_archive_get_unknown(self, content_ids, cur=None): """Retrieve unknown sha1s from content_archive. Args: content_ids ([sha1s]): list of sha1s to test Yields: Unknown sha1s from content_archive """ yield from [] def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ if not self.__logfile: self.open_logfile() print(time.time(), archive_id, new_status, hashutil.hash_to_hex(content_id), file=self.__logfile) def content_archive_add( self, content_ids, sources_present, cur=None): """Insert a new entry in db about content_id. Args: content_ids ([bytes|str]): content identifiers sources_present ([str]): List of source names where contents are present """ pass def get_archiver_storage(cls, args): """Instantiate an archiver database with the proper class and arguments""" if cls == 'db': return ArchiverStorage(**args) elif cls == 'stub': return StubArchiverStorage(**args) else: raise ValueError('Unknown Archiver Storage class `%s`' % cls) diff --git a/swh/archiver/tasks.py b/swh/archiver/tasks.py index ccb0a2f..7c9b2fc 100644 --- a/swh/archiver/tasks.py +++ b/swh/archiver/tasks.py @@ -1,28 +1,21 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.task import Task +from celery import current_app as app + from .worker import ArchiverWithRetentionPolicyWorker from .worker import ArchiverToBackendWorker -class SWHArchiverWithRetentionPolicyTask(Task): - """Main task that archive a batch of content. - - """ - task_queue = 'swh_storage_archive_worker' - - def run_task(self, *args, **kwargs): - ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() +@app.task(name=__name__ + '.SWHArchiverWithRetentionPolicyTask') +def archive_with_retention(*args, **kwargs): + ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() -class SWHArchiverToBackendTask(Task): +@app.task(name=__name__ + '.SWHArchiverToBackendTask') +def archive_to_backend(*args, **kwargs): """Main task that archive a batch of content in the cloud. - """ - task_queue = 'swh_storage_archive_worker_to_backend' - - def run_task(self, *args, **kwargs): - ArchiverToBackendWorker(*args, **kwargs).run() + ArchiverToBackendWorker(*args, **kwargs).run() diff --git a/swh/archiver/tests/__init__.py b/swh/archiver/tests/__init__.py index e69de29..dc8015f 100644 --- a/swh/archiver/tests/__init__.py +++ b/swh/archiver/tests/__init__.py @@ -0,0 +1,5 @@ +from os import path +import swh.archiver + + +SQL_DIR = path.join(path.dirname(swh.archiver.__file__), 'sql') diff --git a/swh/archiver/tests/conftest.py b/swh/archiver/tests/conftest.py new file mode 100644 index 0000000..91d0d36 --- /dev/null +++ b/swh/archiver/tests/conftest.py @@ -0,0 +1,151 @@ +import os +import glob +import tempfile +import shutil + +import pytest + +from swh.core.utils import numfile_sortkey as sortkey +from swh.objstorage import get_objstorage +from swh.scheduler.tests.conftest import * # noqa +from swh.archiver.storage import get_archiver_storage +from swh.archiver import (ArchiverWithRetentionPolicyDirector, + ArchiverWithRetentionPolicyWorker) +from swh.archiver.tests import SQL_DIR + + +DUMP_FILES = os.path.join(SQL_DIR, '*.sql') + + +@pytest.fixture(scope='session') +def celery_includes(): + return [ + 'swh.archiver.tasks', + ] + + +@pytest.fixture +def swh_archiver_db(postgresql): + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + return postgresql + + +@pytest.fixture +def swh_archiver(swh_archiver_db): + + # Create source storage + src_root = tempfile.mkdtemp() + src_config = { + 'cls': 'pathslicing', + 'args': { + 'root': src_root, + 'slicing': '0:2/2:4/4:6' + } + } + src_storage = get_objstorage(**src_config) + + dest_root = tempfile.mkdtemp() + dest_config = { + 'cls': 'pathslicing', + 'args': { + 'root': dest_root, + 'slicing': '0:2/2:4/4:6', + } + } + dest_storage = get_objstorage(**dest_config) + + # Keep mapped the id to the storages + storages = { + 'src': src_storage, # uffizi + 'dest': dest_storage # banco + } + cursor = swh_archiver_db.cursor() + for storage in storages: + cursor.execute("INSERT INTO archive(name) VALUES(%s)", (storage,)) + swh_archiver_db.commit() + + # Override configurations + src_archiver_conf = {'host': 'src'} + dest_archiver_conf = {'host': 'dest'} + src_archiver_conf.update(src_config) + dest_archiver_conf.update(dest_config) + archiver_storages = [src_archiver_conf, dest_archiver_conf] + + def parse_config_file(obj, additional_configs): + return { # noqa + 'archiver_storage': { + 'cls': 'db', + 'args': { + 'dbconn': swh_archiver_db, + }, + }, + 'retention_policy': 2, + 'archival_max_age': 3600, + 'batch_max_size': 5000, + 'asynchronous': False, + 'max_queue_length': 100000, + 'queue_throttling_delay': 120, + } + + orig_director_cfg = ArchiverWithRetentionPolicyDirector.parse_config_file + ArchiverWithRetentionPolicyDirector.parse_config_file = ( + parse_config_file) + + def parse_config_file(obj, additional_configs): + return { # noqa + 'archiver_storage': { + 'cls': 'db', + 'args': { + 'dbconn': swh_archiver_db, + }, + }, + 'retention_policy': 2, + 'archival_max_age': 3600, + 'storages': archiver_storages, + 'source': 'src', + 'sources': ['src'], + } + orig_worker_cfg = ArchiverWithRetentionPolicyWorker.parse_config_file + ArchiverWithRetentionPolicyWorker.parse_config_file = ( + parse_config_file) + + # Create the base archiver + archiver = ArchiverWithRetentionPolicyDirector(start_id=None) + try: + yield archiver, storages + finally: + ArchiverWithRetentionPolicyDirector.parse_config_file = ( + orig_director_cfg) + ArchiverWithRetentionPolicyWorker.parse_config_file = ( + orig_worker_cfg) + shutil.rmtree(src_root) + shutil.rmtree(dest_root) + + +@pytest.fixture +def swh_archiver_storage(swh_archiver): + + log_root = tempfile.mkdtemp() + + config = { + 'cls': 'stub', + 'args': { + 'archives': { + 'present_archive': 'http://src:5003', + 'missing_archive': 'http://dest:5003', + }, + 'present': ['present_archive'], + 'missing': ['missing_archive'], + 'logfile_base': os.path.join(log_root, 'log_'), + } + } + try: + yield get_archiver_storage(**config) + finally: + shutil.rmtree(log_root) diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py index fa64a2e..6cb006b 100644 --- a/swh/archiver/tests/test_archiver.py +++ b/swh/archiver/tests/test_archiver.py @@ -1,453 +1,212 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import glob -import tempfile -import shutil -import unittest -import os +from contextlib import contextmanager +import pytest -from nose.tools import istest -from nose.plugins.attrib import attr +from swh.archiver import ArchiverWithRetentionPolicyWorker +from swh.archiver.db import utcnow +from swh.objstorage.exc import ObjNotFoundError -from swh.core.tests.db_testing import SingleDbTestFixture -from swh.archiver.storage import get_archiver_storage +def add_content(cursor, storage, content_data): + """ Add really a content to the given objstorage -from swh.archiver import ArchiverWithRetentionPolicyDirector -from swh.archiver import ArchiverWithRetentionPolicyWorker -from swh.archiver.db import utcnow + This put an empty status for the added content. -from swh.objstorage import get_objstorage -from swh.objstorage.exc import ObjNotFoundError + Args: + storage_name: the concerned storage + content_data: the data to insert + with_row_insert: to insert a row entry in the db or not + + """ + # Add the content to the storage + obj_id = storage.add(content_data) + cursor.execute(""" INSERT INTO content (sha1) + VALUES (%s) + """, (obj_id,)) + return obj_id + + +def update_status(cursor, archiver, obj_id, storage_name, status, date=None): + """ Update the db status for the given id/storage_name. + + This does not create the content in the storage. + """ + cursor.execute("""insert into archive (name) + values (%s) + on conflict do nothing""", (storage_name,)) + + archiver.archiver_storage.content_archive_update( + obj_id, storage_name, status + ) + + +# Integration test +def test_archive_missing_content(swh_archiver_db, swh_archiver): + """ Run archiver on a missing content should archive it. + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_data = b'archive_missing_content' + obj_id = add_content(cursor, storages['src'], obj_data) + + update_status(cursor, archiver, obj_id, 'src', 'present') + # Content is missing on dest (entry not present in the db) + with pytest.raises(ObjNotFoundError): + storages['dest'].get(obj_id) -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') + archiver.run() + # now the content should be present on remote objstorage + remote_data = storages['dest'].get(obj_id) + assert obj_data == remote_data -@attr('db') -class TestArchiver(SingleDbTestFixture, unittest.TestCase): - """ Test the objstorage archiver. +def test_archive_present_content(swh_archiver_db, swh_archiver): + """ A content that is not 'missing' shouldn't be archived. """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_data = b'archive_present_content' + obj_id = add_content(cursor, storages['src'], obj_data) + + update_status(cursor, archiver, obj_id, 'src', 'present') + update_status(cursor, archiver, obj_id, 'dest', 'present') + + # After the run, the content should NOT be in the archive. + # As the archiver believe it was already in. + archiver.run() + with pytest.raises(ObjNotFoundError): + storages['dest'].get(obj_id) - TEST_DB_NAME = 'softwareheritage-archiver-test' - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump') - TEST_DB_DUMP_TYPE = 'pg_dump' - - def setUp(self): - # Launch the backup server - super().setUp() - - # Create source storage - self.src_root = tempfile.mkdtemp() - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - self.dest_root = tempfile.mkdtemp(prefix='remote') - dest_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6', - } - } - self.dest_storage = get_objstorage(**dest_config) - - # Keep mapped the id to the storages - self.storages = { - 'uffizi': self.src_storage, - 'banco': self.dest_storage - } - - # Override configurations - src_archiver_conf = {'host': 'uffizi'} - dest_archiver_conf = {'host': 'banco'} - src_archiver_conf.update(src_config) - dest_archiver_conf.update(dest_config) - self.archiver_storages = [src_archiver_conf, dest_archiver_conf] - self._override_director_config() - self._override_worker_config() - # Create the base archiver - self.archiver = self._create_director() - - def tearDown(self): - self.empty_tables() - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - super().tearDown() - - def empty_tables(self): - # Remove all content - self.cursor.execute('DELETE FROM content') - self.cursor.execute('DELETE FROM content_copies') - self.conn.commit() - - def _override_director_config(self, retention_policy=2): - """ Override the default config of the Archiver director - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'batch_max_size': 5000, - 'archival_max_age': 3600, - 'retention_policy': retention_policy, - 'asynchronous': False, - 'max_queue_length': 100000, - 'queue_throttling_delay': 120, - } - - def _override_worker_config(self): - """ Override the default config of the Archiver worker - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa - 'retention_policy': 2, - 'archival_max_age': 3600, - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'storages': self.archiver_storages, - 'source': 'uffizi', - 'sources': ['uffizi'], - } - - def _create_director(self): - return ArchiverWithRetentionPolicyDirector(start_id=None) - - def _create_worker(self, batch={}): - return ArchiverWithRetentionPolicyWorker(batch) - - def _add_content(self, storage_name, content_data): - """ Add really a content to the given objstorage - - This put an empty status for the added content. - - Args: - storage_name: the concerned storage - content_data: the data to insert - with_row_insert: to insert a row entry in the db or not - - """ - # Add the content to the storage - obj_id = self.storages[storage_name].add(content_data) - self.cursor.execute(""" INSERT INTO content (sha1) - VALUES (%s) - """, (obj_id,)) - return obj_id - - def _update_status(self, obj_id, storage_name, status, date=None): - """ Update the db status for the given id/storage_name. - - This does not create the content in the storage. - """ - self.cursor.execute("""insert into archive (name) - values (%s) - on conflict do nothing""", (storage_name,)) - - self.archiver.archiver_storage.content_archive_update( - obj_id, storage_name, status - ) - - # Integration test - @istest - def archive_missing_content(self): - """ Run archiver on a missing content should archive it. - """ - obj_data = b'archive_missing_content' - obj_id = self._add_content('uffizi', obj_data) - self._update_status(obj_id, 'uffizi', 'present') - # Content is missing on banco (entry not present in the db) - try: - self.dest_storage.get(obj_id) - except ObjNotFoundError: - pass - else: - self.fail('Content should not be present before archival') - self.archiver.run() - # now the content should be present on remote objstorage - remote_data = self.dest_storage.get(obj_id) - self.assertEquals(obj_data, remote_data) - - @istest - def archive_present_content(self): - """ A content that is not 'missing' shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_present_content') - self._update_status(obj_id, 'uffizi', 'present') - self._update_status(obj_id, 'banco', 'present') - # After the run, the content should NOT be in the archive. - # As the archiver believe it was already in. - self.archiver.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - @istest - def archive_already_enough(self): - """ A content missing with enough copies shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self._override_director_config(retention_policy=1) - director = self._create_director() + +@contextmanager +def override_config(obj, **kw): + orig = obj.config.copy() + obj.config.update(kw) + try: + yield + finally: + obj.config = orig + + +def test_archive_already_enough(swh_archiver_db, swh_archiver): + """ A content missing with enough copies shouldn't be archived. + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_data = b'archive_alread_enough' + obj_id = add_content(cursor, storages['src'], obj_data) + + update_status(cursor, archiver, obj_id, 'src', 'present') + + with override_config(archiver, retention_policy=1): # Obj is present in only one archive but only one copy is required. - director.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - @istest - def content_archive_get_copies(self): - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [], - ) - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [(obj_id, ['uffizi'], {})], - ) - - # Unit tests for archive worker - - def archival_elapsed(self, mtime): - return self._create_worker()._is_archival_delay_elapsed(mtime) - - @istest - def vstatus_ongoing_remaining(self): - self.assertFalse(self.archival_elapsed(utcnow())) - - @istest - def vstatus_ongoing_elapsed(self): - past_time = (utcnow() - - datetime.timedelta( - seconds=self._create_worker().archival_max_age)) - self.assertTrue(self.archival_elapsed(past_time)) - - @istest - def need_archival_missing(self): - """ A content should need archival when it is missing. - """ - status_copies = {'present': ['uffizi'], 'missing': ['banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - True) - - @istest - def need_archival_present(self): - """ A content present everywhere shouldn't need archival - """ - status_copies = {'present': ['uffizi', 'banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - False) - - def _compute_copies_status(self, status): - """ A content with a given status should be detected correctly - """ - obj_id = self._add_content( - 'banco', b'compute_copies_' + bytes(status, 'utf8')) - self._update_status(obj_id, 'banco', status) - worker = self._create_worker() - self.assertIn('banco', worker.compute_copies( - set(worker.objstorages), obj_id)[status]) - - @istest - def compute_copies_present(self): - """ A present content should be detected with correct status - """ - self._compute_copies_status('present') - - @istest - def compute_copies_missing(self): - """ A missing content should be detected with correct status - """ - self._compute_copies_status('missing') - - @istest - def compute_copies_extra_archive(self): - obj_id = self._add_content('banco', b'foobar') - self._update_status(obj_id, 'banco', 'present') - self._update_status(obj_id, 'random_archive', 'present') - worker = self._create_worker() - copies = worker.compute_copies(set(worker.objstorages), obj_id) - self.assertEqual(copies['present'], {'banco'}) - self.assertEqual(copies['missing'], {'uffizi'}) - - def _get_backups(self, present, missing): - """ Return a list of the pair src/dest from the present and missing - """ - worker = self._create_worker() - return list(worker.choose_backup_servers(present, missing)) - - @istest - def choose_backup_servers(self): - self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) - self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) - # Even with more possible destinations, do not take more than the - # retention_policy require - self.assertEqual( - len(self._get_backups(['uffizi'], ['banco', 's3'])), - 1 - ) - - -class TestArchiverStorageStub(unittest.TestCase): - def setUp(self): - self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') - self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') - self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') - - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - # Create destination storage - dest_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.dest_storage = get_objstorage(**dest_config) - - self.config = { - 'cls': 'stub', - 'args': { - 'archives': { - 'present_archive': 'http://uffizi:5003', - 'missing_archive': 'http://banco:5003', - }, - 'present': ['present_archive'], - 'missing': ['missing_archive'], - 'logfile_base': os.path.join(self.log_root, 'log_'), - } - } - - # Generated with: - # - # id_length = 20 - # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') - # - self.content_ids = [ - b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", - b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', - b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', - b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', - b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', - ] - - self.archiver_storage = get_archiver_storage(**self.config) - super().setUp() - - def tearDown(self): - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - shutil.rmtree(self.log_root) - super().tearDown() - - @istest - def archive_ls(self): - self.assertCountEqual( - self.archiver_storage.archive_ls(), - self.config['args']['archives'].items() - ) - - @istest - def content_archive_get(self): - for content_id in self.content_ids: - self.assertEqual( - self.archiver_storage.content_archive_get(content_id), - (content_id, set(self.config['args']['present']), {}), - ) - - @istest - def content_archive_get_copies(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_copies(), - [], - ) - - @istest - def content_archive_get_unarchived_copies(self): - retention_policy = 2 - self.assertCountEqual( - self.archiver_storage.content_archive_get_unarchived_copies( - retention_policy), - [], - ) - - @istest - def content_archive_get_missing(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'missing_archive' - ), - self.content_ids, - ) - - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'present_archive' - ), - [], - ) - - with self.assertRaises(ValueError): - list(self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'unknown_archive' - )) - - @istest - def content_archive_get_unknown(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_unknown( - self.content_ids, - ), - [], - ) - - @istest - def content_archive_update(self): - for content_id in self.content_ids: - self.archiver_storage.content_archive_update( - content_id, 'present_archive', 'present') - self.archiver_storage.content_archive_update( - content_id, 'missing_archive', 'present') - - self.archiver_storage.close_logfile() - - # Make sure we created a logfile - files = glob.glob('%s*' % self.config['args']['logfile_base']) - self.assertEqual(len(files), 1) - - # make sure the logfile contains all our lines - lines = open(files[0]).readlines() - self.assertEqual(len(lines), 2 * len(self.content_ids)) + archiver.run() + with pytest.raises(ObjNotFoundError): + storages['dest'].get(obj_id) + + +def test_content_archive_get_copies(swh_archiver_db, swh_archiver): + archiver, storages = swh_archiver + assert not list(archiver.archiver_storage.content_archive_get_copies()) + + cursor = swh_archiver_db.cursor() + obj_id = add_content(cursor, storages['src'], b'archive_alread_enough') + update_status(cursor, archiver, obj_id, 'src', 'present') + assert list(archiver.archiver_storage.content_archive_get_copies()) == \ + [(obj_id, ['src'], {})] + + +# Unit tests for archive worker + +def create_worker(batch={}): + return ArchiverWithRetentionPolicyWorker(batch) + + +def archival_elapsed(mtime): + return create_worker()._is_archival_delay_elapsed(mtime) + + +def test_vstatus_ongoing_remaining(swh_archiver): + assert not archival_elapsed(utcnow()) + + +def test_vstatus_ongoing_elapsed(swh_archiver): + past_time = (utcnow() + - datetime.timedelta( + seconds=create_worker().archival_max_age)) + assert archival_elapsed(past_time) + + +def test_need_archival_missing(swh_archiver): + """ A content should need archival when it is missing. + """ + status_copies = {'present': ['uffizi'], 'missing': ['banco']} + worker = create_worker() + assert worker.need_archival(status_copies) is True + + +def test_need_archival_present(swh_archiver): + """ A content present everywhere shouldn't need archival + """ + status_copies = {'present': ['uffizi', 'banco']} + worker = create_worker() + assert worker.need_archival(status_copies) is False + + +def compute_copies_status(cursor, archiver, storage, status): + """ A content with a given status should be detected correctly + """ + obj_id = add_content( + cursor, storage, b'compute_copies_' + bytes(status, 'utf8')) + update_status(cursor, archiver, obj_id, 'dest', status) + worker = create_worker() + assert 'dest' in worker.compute_copies( + set(worker.objstorages), obj_id)[status] + + +def test_compute_copies_present(swh_archiver, swh_archiver_db): + """ A present content should be detected with correct status + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + compute_copies_status(cursor, archiver, storages['dest'], 'present') + + +def test_compute_copies_missing(swh_archiver, swh_archiver_db): + """ A missing content should be detected with correct status + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + compute_copies_status(cursor, archiver, storages['dest'], 'missing') + + +def test_compute_copies_extra_archive(swh_archiver, swh_archiver_db): + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_id = add_content(cursor, storages['dest'], b'foobar') + update_status(cursor, archiver, obj_id, 'dest', 'present') + update_status(cursor, archiver, obj_id, 'random_archive', 'present') + worker = create_worker() + copies = worker.compute_copies(set(worker.objstorages), obj_id) + assert copies['present'] == {'dest'} + assert copies['missing'] == {'src'} + + +def get_backups(present=(), missing=()): + """ Return a list of the pair src/dest from the present and missing + """ + worker = create_worker() + return list(worker.choose_backup_servers(present, missing)) + + +def test_choose_backup_servers(swh_archiver, swh_archiver_db): + assert len(get_backups(['src', 'dest'])) == 0 + assert len(get_backups(['src'], ['dest'])) == 1 + # Even with more possible destinations, do not take more than the + # retention_policy require + assert len(get_backups(['src'], ['dest', 's3'])) == 1 diff --git a/swh/archiver/tests/test_archiver_storage.py b/swh/archiver/tests/test_archiver_storage.py new file mode 100644 index 0000000..4c9e09d --- /dev/null +++ b/swh/archiver/tests/test_archiver_storage.py @@ -0,0 +1,79 @@ +# Copyright (C) 2015-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +from glob import glob + + +# Generated with: +# +# id_length = 20 +# random.getrandbits(8 * id_length).to_bytes(id_length, 'big') +# +content_ids = [ + b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", + b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', + b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', + b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', + b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', +] + + +def test_archive_ls(swh_archiver_storage): + assert dict(swh_archiver_storage.archive_ls()) == { + 'present_archive': 'http://src:5003', + 'missing_archive': 'http://dest:5003'} + + +def test_content_archive_get(swh_archiver_storage): + for content_id in content_ids: + assert swh_archiver_storage.content_archive_get(content_id) == \ + (content_id, {'present_archive'}, {}) + + +def test_content_archive_get_copies(swh_archiver_storage): + assert list(swh_archiver_storage.content_archive_get_copies()) == [] + + +def test_content_archive_get_unarchived_copies(swh_archiver_storage): + retention_policy = 2 + assert list(swh_archiver_storage.content_archive_get_unarchived_copies( + retention_policy)) == [] + + +def test_content_archive_get_missing(swh_archiver_storage): + assert list(swh_archiver_storage.content_archive_get_missing( + content_ids, 'missing_archive')) == content_ids + + assert list(swh_archiver_storage.content_archive_get_missing( + content_ids, 'present_archive')) == [] + + with pytest.raises(ValueError): + list(swh_archiver_storage.content_archive_get_missing( + content_ids, 'unknown_archive')) + + +def test_content_archive_get_unknown(swh_archiver_storage): + assert list(swh_archiver_storage.content_archive_get_unknown( + content_ids)) == [] + + +def test_content_archive_update(swh_archiver_storage): + for content_id in content_ids: + swh_archiver_storage.content_archive_update( + content_id, 'present_archive', 'present') + swh_archiver_storage.content_archive_update( + content_id, 'missing_archive', 'present') + + swh_archiver_storage.close_logfile() + + # Make sure we created a logfile + logfile_base = swh_archiver_storage.logfile_base + files = glob('%s*' % logfile_base) + assert len(files) == 1 + + # make sure the logfile contains all our lines + lines = open(files[0]).readlines() + assert len(lines) == (2 * len(content_ids)) diff --git a/swh/archiver/tests/test_checker.py b/swh/archiver/tests/test_checker.py index 1d92b5b..7001a5d 100644 --- a/swh/archiver/tests/test_checker.py +++ b/swh/archiver/tests/test_checker.py @@ -1,155 +1,149 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import tempfile import unittest -from nose.tools import istest -from nose.plugins.attrib import attr +import pytest -from swh.objstorage.exc import ObjNotFoundError from swh.archiver.checker import RepairContentChecker from swh.model import hashutil +from swh.objstorage.exc import ObjNotFoundError class MockBackupObjStorage(): def __init__(self): self.values = {} def add(self, value, obj_id): self.values[obj_id] = value def get(self, obj_id): try: return self.values[obj_id] except KeyError: raise ObjNotFoundError(obj_id) -@attr('fs') +@pytest.mark.fs class TestRepairChecker(unittest.TestCase): """ Test the content integrity checker """ def setUp(self): super().setUp() self._alter_config() self.checker = RepairContentChecker() self.checker.backups = [MockBackupObjStorage(), MockBackupObjStorage()] def _alter_config(self): RepairContentChecker.parse_config_file = ( lambda cls: { 'storage': {'cls': 'pathslicing', 'args': {'root': tempfile.mkdtemp(), 'slicing': '0:2/2:4/4:6'}}, 'batch_size': 1000, 'log_tag': 'objstorage_test', 'backup_storages': {} } ) def _corrupt_content(self, obj_id): """ Make the given content invalid. """ hex_obj_id = hashutil.hash_to_hex(obj_id) file_path = self.checker.objstorage._obj_path(hex_obj_id) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') def _is_corrupted(self, obj_id): """ Ensure the given object is corrupted """ return self.checker._check_content(obj_id) == 'corrupted' def _is_missing(self, obj_id): """ Ensure the given object is missing """ return self.checker._check_content(obj_id) == 'missing' - @istest - def check_valid_content(self): + def test_check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' obj_id = self.checker.objstorage.add(content) self.assertFalse(self._is_corrupted(obj_id)) self.assertFalse(self._is_missing(obj_id)) - @istest - def check_corrupted_content(self): + def test_check_corrupted_content(self): # Check that an invalid content is noticed. content = b'check_corrupted_content' obj_id = self.checker.objstorage.add(content) self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) self.assertFalse(self._is_missing(obj_id)) - @istest - def check_missing_content(self): - obj_id = hashutil.hash_data(b'check_missing_content')['sha1'] + def test_check_missing_content(self): + hashes = hashutil.MultiHash.from_data( + b'check_missing_content', hash_names=['sha1']).digest() + obj_id = hashes['sha1'] self.assertFalse(self._is_corrupted(obj_id)) self.assertTrue(self._is_missing(obj_id)) - @istest - def repair_content_present_first(self): + def test_repair_content_present_first(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' obj_id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backups[0].add(content, obj_id) # Corrupt and repair it. self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) self.checker.corrupted_content(obj_id) self.assertFalse(self._is_corrupted(obj_id)) - @istest - def repair_content_present_second(self): + def test_repair_content_present_second(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' obj_id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backups[-1].add(content, obj_id) # Corrupt and repair it. self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) self.checker.corrupted_content(obj_id) self.assertFalse(self._is_corrupted(obj_id)) - @istest - def repair_content_present_distributed(self): + def test_repair_content_present_distributed(self): # Try to repair two contents that are in separate backup storages. content1 = b'repair_content_present_distributed_2' content2 = b'repair_content_present_distributed_1' obj_id1 = self.checker.objstorage.add(content1) obj_id2 = self.checker.objstorage.add(content2) # Add content to the mock. self.checker.backups[0].add(content1, obj_id1) self.checker.backups[1].add(content2, obj_id2) # Corrupt the contents self._corrupt_content(obj_id1) self._corrupt_content(obj_id2) self.assertTrue(self._is_corrupted(obj_id1)) self.assertTrue(self._is_corrupted(obj_id2)) # Repare them self.checker.corrupted_content(obj_id1) self.checker.corrupted_content(obj_id2) self.assertFalse(self._is_corrupted(obj_id1)) self.assertFalse(self._is_corrupted(obj_id2)) - @istest - def repair_content_missing(self): + def test_repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_missing' obj_id = self.checker.objstorage.add(content) # Corrupt the content self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) # Try to repair it self.checker.corrupted_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) diff --git a/swh/archiver/updater.py b/swh/archiver/updater.py index 64cf1ce..f2aa1d3 100644 --- a/swh/archiver/updater.py +++ b/swh/archiver/updater.py @@ -1,56 +1,56 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from swh.journal.client import SWHJournalClient +from swh.journal.client import JournalClient from .storage import get_archiver_storage -class SWHArchiverContentUpdater(SWHJournalClient): +class ArchiverContentUpdater(JournalClient): """Client in charge of updating new contents in the content_archiver db. This is a swh.journal client only dealing with contents. """ CONFIG_BASE_FILENAME = 'archiver/content_updater' ADDITIONAL_CONFIG = { 'archiver_storage': ( 'dict', { 'cls': 'db', 'args': { 'dbconn': 'dbname=softwareheritage-archiver-dev ' 'user=guest', } }), 'sources_present': ('list[str]', ['uffizi']) } def __init__(self): # Only interested in content here so override the configuration super().__init__(extra_configuration={'object_types': ['content']}) self.sources_present = self.config['sources_present'] self.archiver_storage = get_archiver_storage( **self.config['archiver_storage']) def process_objects(self, messages): self.archiver_storage.content_archive_add( (c[b'sha1'] for c in messages['content']), self.sources_present) if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s %(process)d %(levelname)s %(message)s' ) - content_updater = SWHArchiverContentUpdater() + content_updater = ArchiverContentUpdater() content_updater.process() diff --git a/swh/archiver/worker.py b/swh/archiver/worker.py index 257cfdc..9c6ceca 100644 --- a/swh/archiver/worker.py +++ b/swh/archiver/worker.py @@ -1,431 +1,431 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import logging import random from collections import defaultdict from celery import group from swh.core import config, utils from swh.objstorage import get_objstorage from swh.objstorage.exc import Error, ObjNotFoundError from swh.model import hashutil from swh.scheduler.utils import get_task from .storage import get_archiver_storage from .copier import ArchiverCopier logger = logging.getLogger('archiver.worker') class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta): """Base archive worker. Inherit from this class and override: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def need_archival(self, content_data): Determine if a content needs archival or not - def choose_backup_servers(self, present, missing): Choose which backup server to send copies to """ DEFAULT_CONFIG = { 'archiver_storage': ('dict', { 'cls': 'db', 'args': { 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest', }, }), 'storages': ('list[dict]', [ {'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}} ]) } ADDITIONAL_CONFIG = {} CONFIG_BASE_FILENAME = 'archiver/worker' objstorages = {} def __init__(self, batch): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.batch = batch self.archiver_db = get_archiver_storage( **self.config['archiver_storage']) self.objstorages = { storage['host']: get_objstorage(storage['cls'], storage['args']) for storage in self.config.get('storages', []) } self.set_objstorages = set(self.objstorages) def run(self): """Do the task expected from the archiver worker. Process the contents in self.batch, ensure that the elements still need an archival (using archiver db), and spawn copiers to copy files in each destination according to the archiver-worker's policy. """ transfers = defaultdict(list) for obj_id in self.batch: # Get dict {'missing': [servers], 'present': [servers]} # for contents ignoring those who don't need archival. copies = self.compute_copies(self.set_objstorages, obj_id) if not copies: # could not happen if using .director module msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id) logger.warning(msg) continue if not self.need_archival(copies): continue present = copies.get('present', set()) missing = copies.get('missing', set()) if len(present) == 0: msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id) logger.critical(msg) continue # Choose servers to be used as srcs and dests. for src_dest in self.choose_backup_servers(present, missing): transfers[src_dest].append(obj_id) # Then run copiers for each of the required transfers. contents_copied = [] for (src, dest), content_ids in transfers.items(): contents_copied.extend(self.run_copier(src, dest, content_ids)) # copy is done, eventually do something else with them self.copy_finished(contents_copied) def compute_copies(self, set_objstorages, content_id): """From a content_id, return present and missing copies. Args: objstorages (set): objstorage's id name content_id: the content concerned Returns: dict: A dictionary with the following keys: - present: set of archives where the content is present - missing: set of archives where the content is missing - ongoing: dict mapping the archive id with the time the copy supposedly started. """ result = self.archiver_db.content_archive_get(content_id) if not result: return None _, present, ongoing = result set_present = set_objstorages & set(present) set_ongoing = set_objstorages & set(ongoing) set_missing = set_objstorages - set_present - set_ongoing return { 'present': set_present, 'missing': set_missing, 'ongoing': {archive: value for archive, value in ongoing.items() if archive in set_ongoing}, } def run_copier(self, source, destination, content_ids): """Run a copier in order to archive the given contents. Upload the given contents from the source to the destination. If the process fails, the whole content is considered uncopied and remains 'ongoing', waiting to be rescheduled as there is a delay. Args: source (str): source storage's identifier destination (str): destination storage's identifier content_ids ([sha1]): list of content ids to archive. """ # Check if there are any errors among the contents. content_status = self.get_contents_error(content_ids, source) # Iterates over the error detected. for content_id, real_status in content_status.items(): # Remove them from the to-archive list, # as they cannot be retrieved correctly. content_ids.remove(content_id) # Update their status to reflect their real state. self.archiver_db.content_archive_update( content_id, archive_id=source, new_status=real_status) # Now perform the copy on the remaining contents ac = ArchiverCopier( source=self.objstorages[source], destination=self.objstorages[destination], content_ids=content_ids) if ac.run(): # Once the archival complete, update the database. for content_id in content_ids: self.archiver_db.content_archive_update( content_id, archive_id=destination, new_status='present') return content_ids return [] def copy_finished(self, content_ids): """Hook to notify the content_ids archive copy is finished. (This is not an abstract method as this is optional """ pass def get_contents_error(self, content_ids, source_storage): """Indicates what is the error associated to a content when needed Check the given content on the given storage. If an error is detected, it will be reported through the returned dict. Args: content_ids ([sha1]): list of content ids to check source_storage (str): the source storage holding the contents to check. Returns: a dict that map {content_id -> error_status} for each content_id with an error. The `error_status` result may be 'missing' or 'corrupted'. """ content_status = {} storage = self.objstorages[source_storage] for content_id in content_ids: try: storage.check(content_id) except Error: content_status[content_id] = 'corrupted' logger.error('%s corrupted!' % hashutil.hash_to_hex( content_id)) except ObjNotFoundError: content_status[content_id] = 'missing' logger.error('%s missing!' % hashutil.hash_to_hex(content_id)) return content_status @abc.abstractmethod def need_archival(self, content_data): """Indicate if the content needs to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ pass @abc.abstractmethod def choose_backup_servers(self, present, missing): """Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. Args: present: set of objstorage source name where the content is present missing: set of objstorage destination name where the content is missing Yields: tuple (source (str), destination (src)) for each required copy. """ pass class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker): """ Do the required backups on a given batch of contents. Process the content of a content batch in order to do the needed backups on the slaves servers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), 'archival_max_age': ('int', 3600), 'sources': ('list[str]', ['uffizi', 'banco']), } def __init__(self, batch): """ Constructor of the ArchiverWorker class. Args: batch: list of object's sha1 that potentially need archival. """ super().__init__(batch) config = self.config self.retention_policy = config['retention_policy'] self.archival_max_age = config['archival_max_age'] self.sources = config['sources'] if len(self.objstorages) < self.retention_policy: raise ValueError('Retention policy is too high for the number of ' 'provided servers') def need_archival(self, content_data): """ Indicate if the content need to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ nb_presents = len(content_data.get('present', [])) for copy, mtime in content_data.get('ongoing', {}).items(): if not self._is_archival_delay_elapsed(mtime): nb_presents += 1 return nb_presents < self.retention_policy def _is_archival_delay_elapsed(self, start_time): """ Indicates if the archival delay is elapsed given the start_time Args: start_time (float): time at which the archival started. Returns: True if the archival delay is elasped, False otherwise """ elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - start_time return elapsed > datetime.timedelta(seconds=self.archival_max_age) def choose_backup_servers(self, present, missing): """Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. Each destination server is unique so after archival, the retention policy requirement will be fulfilled. However, the source server may be used multiple times. Args: present: set of objstorage source name where the content is present missing: set of objstorage destination name where the content is missing Yields: tuple (source, destination) for each required copy. """ # Transform from set to list to allow random selections missing = list(missing) present = list(present) all_sources = [source for source in present if source in self.sources] nb_required = self.retention_policy - len(present) destinations = random.sample(missing, nb_required) sources = [random.choice(all_sources) for dest in destinations] yield from zip(sources, destinations) class ArchiverToBackendWorker(BaseArchiveWorker): """Worker that sends copies over from a source to another backend. Process the content of a content batch from source objstorage to destination objstorage. """ CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' ADDITIONAL_CONFIG = { 'next_task': ( 'dict', { - 'queue': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', + 'name': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', 'batch_size': 10, } ) } def __init__(self, destination, batch): """Constructor of the ArchiverWorkerToBackend class. Args: destination: where to copy the objects from batch: sha1s to send to destination """ super().__init__(batch) self.destination = destination next_task = self.config['next_task'] if next_task: - destination_queue = next_task['queue'] - self.task_destination = get_task(destination_queue) + destination_task = next_task['name'] + self.task_destination = get_task(destination_task) self.batch_size = int(next_task['batch_size']) else: self.task_destination = self.batch_size = None def need_archival(self, content_data): """Indicate if the content needs to be archived. Args: content_data (dict): dict that contains 3 lists 'present', 'ongoing' and 'missing' with copies id corresponding to this status. Returns: True if we need to archive, False otherwise """ return self.destination in content_data.get('missing', {}) def choose_backup_servers(self, present, missing): """The destination is fixed to the destination mentioned. The only variable here is the source of information that we choose randomly in 'present'. Args: present: set of objstorage source name where the content is present missing: set of objstorage destination name where the content is missing Yields: tuple (source, destination) for each required copy. """ yield (random.choice(list(present)), self.destination) def copy_finished(self, content_ids): """Once the copy is finished, we'll send those batch of contents as done in the destination queue. """ if self.task_destination: groups = [] for ids in utils.grouper(content_ids, self.batch_size): sig_ids = self.task_destination.s(list(ids)) groups.append(sig_ids) group(groups).delay() diff --git a/version.txt b/version.txt index 5802a5c..34c1deb 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.4-0-ge34fa19 \ No newline at end of file +v0.0.6-0-g92eb856 \ No newline at end of file