diff --git a/PKG-INFO b/PKG-INFO index 427998d9..5ab92922 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.39 +Version: 0.0.40 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/swh-objstorage-add-dir b/bin/swh-objstorage-add-dir index 651eeded..c1dd69d9 100755 --- a/bin/swh-objstorage-add-dir +++ b/bin/swh-objstorage-add-dir @@ -1,37 +1,37 @@ #!/usr/bin/python3 # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import sys from swh.storage import objstorage if __name__ == '__main__': try: root_dir = sys.argv[1] dirname = sys.argv[2] except IndexError: print("Usage: swh-objstorage-add-dir OBJ_STORAGE_DIR DATA_DIR") sys.exit(1) logging.basicConfig(level=logging.INFO) objs = objstorage.ObjStorage(root_dir) dups = 0 for root, _dirs, files in os.walk(dirname): for name in files: path = os.path.join(root, name) with open(path, 'rb') as f: try: - objs.add_file(f, length=os.path.getsize(path)) + objs.add(f.read()) except objstorage.DuplicateObjError: dups += 1 if dups: logging.info('skipped %d duplicate(s) file(s)' % dups) diff --git a/debian/control b/debian/control index 8c14e79d..db59af7c 100644 --- a/debian/control +++ b/debian/control @@ -1,24 +1,24 @@ Source: swh-storage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-dateutil, python3-flask, python3-nose, python3-psycopg2, python3-requests, python3-setuptools, - python3-swh.core (>= 0.0.17~), + python3-swh.core (>= 0.0.20~), python3-vcversioner, python3-swh.scheduler, python3-click Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ Package: python3-swh.storage Architecture: all -Depends: python3-swh.core (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} +Depends: python3-swh.core (>= 0.0.20~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage utilities diff --git a/debian/rules b/debian/rules index 9ad29f6a..73ffb1bc 100755 --- a/debian/rules +++ b/debian/rules @@ -1,14 +1,14 @@ #!/usr/bin/make -f # This file was automatically generated by stdeb 0.8.5 at # Tue, 22 Sep 2015 12:05:09 +0200 export PYBUILD_NAME=swh-storage %: dh $@ --with python3 --buildsystem=pybuild override_dh_auto_test: PYBUILD_SYSTEM=custom \ - PYBUILD_TEST_ARGS="python{version} -m nose swh -sva '!db'" \ + PYBUILD_TEST_ARGS="cd {build_dir}; python{version} -m nose swh -sva '!db'" \ dh_auto_test diff --git a/requirements.txt b/requirements.txt index faeaec3b..7f84a797 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,15 @@ dateutil psycopg2 vcversioner # remote storage API client requests # remote storage API server flask # Internal dependencies -swh.core >= 0.0.17 +swh.core >= 0.0.20 click swh.scheduler diff --git a/setup.py b/setup.py index 145e3590..783293d8 100644 --- a/setup.py +++ b/setup.py @@ -1,41 +1,42 @@ #!/usr/bin/env python3 from setuptools import setup def parse_requirements(): requirements = [] with open('requirements.txt') as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.storage', description='Software Heritage storage manager', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSTO/', packages=[ 'swh.storage', 'swh.storage.archiver', 'swh.storage.api', + 'swh.storage.checker', 'swh.storage.objstorage', 'swh.storage.objstorage.api', 'swh.storage.tests', ], scripts=[ 'bin/swh-objstorage-add-dir', 'bin/swh-objstorage-fsck', 'bin/swh-storage-add-dir', ], install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 427998d9..5ab92922 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.39 +Version: 0.0.40 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index 2dc3b4fc..1f24a5c4 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,141 +1,146 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile Makefile.local README.db_testing README.dev requirements.txt setup.py version.txt bin/swh-objstorage-add-dir bin/swh-objstorage-fsck bin/swh-storage-add-dir debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/archiver-blueprint.md sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/swh-data.sql sql/swh-func.sql sql/swh-init.sql sql/swh-schema.sql sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/revision.metadata.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/revision.metadata.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql sql/upgrades/069.sql swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py swh/storage/converters.py swh/storage/db.py swh/storage/exc.py swh/storage/storage.py swh/storage/api/__init__.py swh/storage/api/client.py swh/storage/api/common.py swh/storage/api/server.py swh/storage/archiver/__init__.py swh/storage/archiver/copier.py swh/storage/archiver/director.py swh/storage/archiver/tasks.py swh/storage/archiver/worker.py +swh/storage/checker/__init__.py +swh/storage/checker/checker.py swh/storage/objstorage/__init__.py swh/storage/objstorage/objstorage.py +swh/storage/objstorage/objstorage_pathslicing.py swh/storage/objstorage/api/__init__.py swh/storage/objstorage/api/client.py swh/storage/objstorage/api/server.py swh/storage/tests/manual_test_archiver.py +swh/storage/tests/objstorage_testing.py swh/storage/tests/server_testing.py swh/storage/tests/test_api_client.py swh/storage/tests/test_archiver.py +swh/storage/tests/test_checker.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py -swh/storage/tests/test_objstorage.py swh/storage/tests/test_objstorage_api.py +swh/storage/tests/test_objstorage_pathslicing.py swh/storage/tests/test_storage.py utils/dump_revisions.py utils/fix_revisions_from_dump.py \ No newline at end of file diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index 0ae88685..86520905 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,8 +1,8 @@ click dateutil flask psycopg2 requests -swh.core>=0.0.17 +swh.core>=0.0.20 swh.scheduler vcversioner diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py index 74003f0d..15f26ff2 100644 --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,243 +1,243 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import swh import logging import click from datetime import datetime from swh.core import hashutil, config from swh.scheduler.celery_backend.config import app from . import tasks # NOQA DEFAULT_CONFIG = { 'objstorage_path': ('str', '/tmp/swh-storage/objects'), 'batch_max_size': ('int', 50), 'archival_max_age': ('int', 3600), 'retention_policy': ('int', 2), 'asynchronous': ('bool', True), 'dbconn': ('str', 'dbname=softwareheritage-dev user=guest') } task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' logger = logging.getLogger() class ArchiverDirector(): """Process the files in order to know which one is needed as backup. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. Attributes: master_storage: the local storage of the master server. slave_storages: Iterable of remote obj storages to the slaves servers used for backup. config: Archiver_configuration. A dictionary that must contain the following keys. objstorage_path (string): the path of the objstorage of the master. batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ def __init__(self, db_conn, config): """ Constructor of the archiver director. Args: db_conn: db_conn: Either a libpq connection string, or a psycopg2 connection. config: Archiver_configuration. A dictionary that must contains the following keys. objstorage_path (string): the path of the objstorage of the master. batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ # Get the local storage of the master and remote ones for the slaves. self.master_storage_args = [db_conn, config['objstorage_path']] master_storage = swh.storage.get_storage('local_storage', self.master_storage_args) slaves = { id: url for id, url in master_storage.db.archive_ls() } # TODO Database should be initialized somehow before going in # production. For now, assumes that the database contains # datas for all the current content. self.master_storage = master_storage self.slave_storages = slaves self.config = config def run(self): """ Run the archiver director. The archiver director will check all the contents of the archiver database and do the required backup jobs. """ if self.config['asynchronous']: run_fn = self.run_async_worker else: run_fn = self.run_sync_worker for batch in self.get_unarchived_content(): run_fn(batch) def run_async_worker(self, batch): """ Produce a worker that will be added to the task queue. """ task = app.tasks[task_name] task.delay(batch, self.master_storage_args, - self.slave_storages, self.config['retention_policy']) + self.slave_storages, self.config) def run_sync_worker(self, batch): """ Run synchronously a worker on the given batch. """ task = app.tasks[task_name] task(batch, self.master_storage_args, self.slave_storages, self.config) def get_unarchived_content(self): """ get all the contents that needs to be archived. Yields: A batch of contents. Batches are dictionaries which associates a content id to the data about servers that contains it or not. {'id1': {'present': [('slave1', 'slave1_url')], 'missing': [('slave2', 'slave2_url'), ('slave3', 'slave3_url')] }, 'id2': {'present': [], 'missing': [ ('slave1', 'slave1_url'), ('slave2', 'slave2_url'), ('slave3', 'slave3_url') ]} } Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) are ids and urls of the storage slaves. At least all the content that don't have enough copies on the backups servers are distributed into these batches. """ # Get the data about each content referenced into the archiver. missing_copy = {} for content_id in self.master_storage.db.content_archive_ls(): db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) # Fetch the datas about archival status of the content backups = self.master_storage.db.content_archive_get( content=db_content_id ) for _content_id, server_id, status, mtime in backups: virtual_status = self.get_virtual_status(status, mtime) server_data = (server_id, self.slave_storages[server_id]) missing_copy.setdefault( db_content_id, {'present': [], 'missing': []} ).setdefault(virtual_status, []).append(server_data) # Check the content before archival. try: self.master_storage.objstorage.check(content_id[0]) except Exception as e: # Exception can be Error or ObjNotFoundError. logger.error(e) # TODO Do something to restore the content? if len(missing_copy) >= self.config['batch_max_size']: yield missing_copy missing_copy = {} if len(missing_copy) > 0: yield missing_copy def get_virtual_status(self, status, mtime): """ Compute the virtual presence of a content. If the status is ongoing but the time is not elasped, the archiver consider it will be present in the futur, and so consider it as present. However, if the time is elasped, the copy may have failed, so consider the content as missing. Arguments: status (string): One of ('present', 'missing', 'ongoing'). The status of the content. mtime (datetime): Time at which the content have been updated for the last time. Returns: The virtual status of the studied content, which is 'present' or 'missing'. Raises: ValueError: if the status is not one 'present', 'missing' or 'ongoing' """ if status in ('present', 'missing'): return status # If the status is 'ongoing' but there is still time, another worker # may still be on the task. if status == 'ongoing': mtime = mtime.replace(tzinfo=None) elapsed = (datetime.now() - mtime).total_seconds() if elapsed <= self.config['archival_max_age']: return 'present' else: return 'missing' else: raise ValueError("status must be either 'present', 'missing' " "or 'ongoing'") @click.command() @click.argument('config-path', required=1) @click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], help="Connection string for the database") @click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], help="Indicates if the archiver should run asynchronously") def launch(config_path, dbconn, async): # The configuration have following priority : # command line > file config > default config cl_config = { 'dbconn': dbconn, 'asynchronous': async } conf = config.read(config_path, DEFAULT_CONFIG) conf.update(cl_config) # Create connection data and run the archiver. archiver = ArchiverDirector(conf['dbconn'], conf) logger.info("Starting an archival at", datetime.now()) archiver.run() if __name__ == '__main__': launch() diff --git a/swh/storage/checker/__init__.py b/swh/storage/checker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py new file mode 100644 index 00000000..110d72ea --- /dev/null +++ b/swh/storage/checker/checker.py @@ -0,0 +1,171 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging + +from swh.core import config, hashutil +from .. import get_storage +from ..objstorage import PathSlicingObjStorage +from ..exc import ObjNotFoundError, Error + +DEFAULT_CONFIG = { + 'storage_path': ('str', '/srv/softwareheritage/objects'), + 'storage_depth': ('int', 3), + 'backup_url': ('str', 'http://uffizi:5002/'), + + 'batch_size': ('int', 1000), +} + + +class ContentChecker(): + """ Content integrity checker that will check local objstorage content. + + The checker will check the data of an object storage in order to verify + that no file have been corrupted. + + Attributes: + config: dictionary that contains this + checker configuration + objstorage (ObjStorage): Local object storage that will be checked. + master_storage (RemoteStorage): A distant storage that will be used to + restore corrupted content. + """ + + def __init__(self, config, root, depth, backup_urls): + """ Create a checker that ensure the objstorage have no corrupted file. + + Args: + config (dict): Dictionary that contains the following keys : + batch_size: Number of content that should be tested each + time the content checker runs. + root (string): Path to the objstorage directory + depth (int): Depth of the object storage. + backup_urls: List of url that can be contacted in order to + get a content. + """ + self.config = config + self.objstorage = PathSlicingObjStorage(root, depth, slicing=2) + self.backup_storages = [get_storage('remote_storage', [backup_url]) + for backup_url in backup_urls] + + def run(self): + """ Start the check routine + """ + corrupted_contents = [] + batch_size = self.config['batch_size'] + + for content_id in self.get_content_to_check(batch_size): + if not self.check_content(content_id): + corrupted_contents.append(content_id) + logging.error('The content', content_id, 'have been corrupted') + + self.repair_contents(corrupted_contents) + + def run_as_daemon(self): + """ Start the check routine and perform it forever. + + Use this method to run the checker when it's done as a daemon that + will iterate over the content forever in background. + """ + while True: + try: + self.run() + except Exception as e: + logging.error('An error occured while verifing the content: %s' + % e) + + def get_content_to_check(self, batch_size): + """ Get the content that should be verified. + + Returns: + An iterable of the content's id that need to be checked. + """ + contents = self.objstorage.get_random_contents(batch_size) + yield from contents + + def check_content(self, content_id): + """ Check the validity of the given content. + + Returns: + True if the content was valid, false if it was corrupted. + """ + try: + self.objstorage.check(content_id) + except (ObjNotFoundError, Error) as e: + logging.warning(e) + return False + else: + return True + + def repair_contents(self, content_ids): + """ Try to restore the given contents. + + Ask the backup storages for the contents that are corrupted on + the local object storage. + If the first storage does not contain the missing contents, send + a request to the second one with only the content that couldn't be + retrieved, and so on until there is no remaining content or servers. + + If a content couldn't be retrieved on all the servers, then log it as + an error. + """ + contents_to_get = set(content_ids) + # Iterates over the backup storages. + for backup_storage in self.backup_storages: + # Try to get all the contents that still need to be retrieved. + contents = backup_storage.content_get(list(contents_to_get)) + for content in contents: + if content: + hash = content['sha1'] + data = content['data'] + # When a content is retrieved, remove it from the set + # of needed contents. + contents_to_get.discard(hash) + self.objstorage.restore(data) + + # Contents still in contents_to_get couldn't be retrieved. + if contents_to_get: + logging.error( + "Some corrupted contents could not be retrieved : %s" + % [hashutil.hash_to_hex(id) for id in contents_to_get] + ) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], + help='Path to the storage to verify') +@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], + type=click.INT, help='Depth of the object storage') +@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], + help='Url of a remote storage to retrieve corrupted content') +@click.option('--daemon/--nodaemon', default=True, + help='Indicates if the checker should run forever ' + 'or on a single batch of content') +def launch(config_path, storage_path, depth, backup_url, is_daemon): + # The configuration have following priority : + # command line > file config > default config + cl_config = { + 'storage_path': storage_path, + 'storage_depth': depth, + 'backup_url': backup_url + } + conf = config.read(config_path, DEFAULT_CONFIG) + conf.update(cl_config) + # Create the checker and run + checker = ContentChecker( + {'batch_size': conf['batch_size']}, + conf['storage_path'], + conf['storage_depth'], + map(lambda x: x.strip(), conf['backup_url'].split(',')) + ) + if is_daemon: + checker.run_as_daemon() + else: + checker.run() + +if __name__ == '__main__': + launch() diff --git a/swh/storage/converters.py b/swh/storage/converters.py index 9bf2a35a..7015fea5 100644 --- a/swh/storage/converters.py +++ b/swh/storage/converters.py @@ -1,360 +1,306 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import codecs import datetime import numbers +from swh.core.utils import decode_with_escape, encode_with_unescape DEFAULT_AUTHOR = { 'fullname': None, 'name': None, 'email': None, } DEFAULT_DATE = { 'timestamp': None, 'offset': 0, 'neg_utc_offset': None, } -def backslashescape_errors(exception): - if isinstance(exception, UnicodeDecodeError): - bad_data = exception.object[exception.start:exception.end] - escaped = ''.join(r'\x%02x' % x for x in bad_data) - return escaped, exception.end - - return codecs.backslashreplace_errors(exception) - -codecs.register_error('backslashescape', backslashescape_errors) - - -def decode_with_escape(value): - """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences - as \\x. We also escape NUL bytes as they are invalid in JSON - strings. - """ - # escape backslashes - value = value.replace(b'\\', b'\\\\') - value = value.replace(b'\x00', b'\\x00') - return value.decode('utf-8', 'backslashescape') - - -def encode_with_unescape(value): - """Encode an unicode string containing \\x backslash escapes""" - slices = [] - start = 0 - odd_backslashes = False - i = 0 - while i < len(value): - if value[i] == '\\': - odd_backslashes = not odd_backslashes - else: - if odd_backslashes: - if value[i] != 'x': - raise ValueError('invalid escape for %r at position %d' % - (value, i-1)) - slices.append( - value[start:i-1].replace('\\\\', '\\').encode('utf-8') - ) - slices.append(bytes.fromhex(value[i+1:i+3])) - - odd_backslashes = False - start = i = i + 3 - continue - - i += 1 - - slices.append( - value[start:i].replace('\\\\', '\\').encode('utf-8') - ) - - return b''.join(slices) - - def author_to_db(author): """Convert a swh-model author to its DB representation. Args: a swh-model compatible author Returns: a dict containing three keys: author, fullname and email """ if author is None: return DEFAULT_AUTHOR return author def db_to_author(id, fullname, name, email): """Convert the DB representation of an author to a swh-model author. Args: id (long): the author's identifier fullname (bytes): the author's fullname name (bytes): the author's name email (bytes): the author's email Returns: a dict with four keys: id, fullname, name and email, or None if the id is None """ if id is None: return None return { 'id': id, 'fullname': fullname, 'name': name, 'email': email, } def git_headers_to_db(git_headers): """Convert git headers to their database representation. We convert the bytes to unicode by decoding them into utf-8 and replacing invalid utf-8 sequences with backslash escapes. """ ret = [] for key, values in git_headers: if isinstance(values, list): ret.append([key, [decode_with_escape(value) for value in values]]) else: ret.append([key, decode_with_escape(values)]) return ret def db_to_git_headers(db_git_headers): ret = [] for key, values in db_git_headers: if isinstance(values, list): ret.append([key, [encode_with_unescape(value) for value in values]]) else: ret.append([key, encode_with_unescape(values)]) return ret def db_to_date(date, offset, neg_utc_offset): """Convert the DB representation of a date to a swh-model compatible date. Args: date (datetime.datetime): a date pulled out of the database offset (int): an integer number of minutes representing an UTC offset neg_utc_offset (boolean): whether an utc offset is negative Returns: a dict with three keys: timestamp: a timestamp from UTC offset: the number of minutes since UTC negative_utc: whether a null UTC offset is negative """ if date is None: return None return { 'timestamp': date.timestamp(), 'offset': offset, 'negative_utc': neg_utc_offset, } def date_to_db(date_offset): """Convert a swh-model date_offset to its DB representation. Args: a swh-model compatible date_offset Returns: a dict with three keys: timestamp: a date in ISO format offset: the UTC offset in minutes neg_utc_offset: a boolean indicating whether a null offset is negative or positive. """ if date_offset is None: return DEFAULT_DATE if isinstance(date_offset, numbers.Real): date_offset = datetime.datetime.fromtimestamp(date_offset, tz=datetime.timezone.utc) if isinstance(date_offset, datetime.datetime): timestamp = date_offset utcoffset = date_offset.utcoffset() offset = int(utcoffset.total_seconds()) // 60 neg_utc_offset = False if offset == 0 else None else: if isinstance(date_offset['timestamp'], numbers.Real): timestamp = datetime.datetime.fromtimestamp( date_offset['timestamp'], tz=datetime.timezone.utc) else: timestamp = date_offset['timestamp'] offset = date_offset['offset'] neg_utc_offset = date_offset.get('negative_utc', None) return { 'timestamp': timestamp.isoformat(), 'offset': offset, 'neg_utc_offset': neg_utc_offset, } def revision_to_db(revision): """Convert a swh-model revision to its database representation. """ author = author_to_db(revision['author']) date = date_to_db(revision['date']) committer = author_to_db(revision['committer']) committer_date = date_to_db(revision['committer_date']) metadata = revision['metadata'] if metadata and 'extra_headers' in metadata: metadata = metadata.copy() extra_headers = git_headers_to_db(metadata['extra_headers']) metadata['extra_headers'] = extra_headers return { 'id': revision['id'], 'author_fullname': author['fullname'], 'author_name': author['name'], 'author_email': author['email'], 'date': date['timestamp'], 'date_offset': date['offset'], 'date_neg_utc_offset': date['neg_utc_offset'], 'committer_fullname': committer['fullname'], 'committer_name': committer['name'], 'committer_email': committer['email'], 'committer_date': committer_date['timestamp'], 'committer_date_offset': committer_date['offset'], 'committer_date_neg_utc_offset': committer_date['neg_utc_offset'], 'type': revision['type'], 'directory': revision['directory'], 'message': revision['message'], 'metadata': metadata, 'synthetic': revision['synthetic'], 'parents': [ { 'id': revision['id'], 'parent_id': parent, 'parent_rank': i, } for i, parent in enumerate(revision['parents']) ], } def db_to_revision(db_revision): """Convert a database representation of a revision to its swh-model representation.""" author = db_to_author( db_revision['author_id'], db_revision['author_fullname'], db_revision['author_name'], db_revision['author_email'], ) date = db_to_date( db_revision['date'], db_revision['date_offset'], db_revision['date_neg_utc_offset'], ) committer = db_to_author( db_revision['committer_id'], db_revision['committer_fullname'], db_revision['committer_name'], db_revision['committer_email'], ) committer_date = db_to_date( db_revision['committer_date'], db_revision['committer_date_offset'], db_revision['committer_date_neg_utc_offset'] ) metadata = db_revision['metadata'] if metadata and 'extra_headers' in metadata: extra_headers = db_to_git_headers(metadata['extra_headers']) metadata['extra_headers'] = extra_headers parents = [] if 'parents' in db_revision: for parent in db_revision['parents']: if parent: parents.append(parent) return { 'id': db_revision['id'], 'author': author, 'date': date, 'committer': committer, 'committer_date': committer_date, 'type': db_revision['type'], 'directory': db_revision['directory'], 'message': db_revision['message'], 'metadata': metadata, 'synthetic': db_revision['synthetic'], 'parents': parents, } def release_to_db(release): """Convert a swh-model release to its database representation. """ author = author_to_db(release['author']) date = date_to_db(release['date']) return { 'id': release['id'], 'author_fullname': author['fullname'], 'author_name': author['name'], 'author_email': author['email'], 'date': date['timestamp'], 'date_offset': date['offset'], 'date_neg_utc_offset': date['neg_utc_offset'], 'name': release['name'], 'target': release['target'], 'target_type': release['target_type'], 'comment': release['message'], 'synthetic': release['synthetic'], } def db_to_release(db_release): """Convert a database representation of a release to its swh-model representation. """ author = db_to_author( db_release['author_id'], db_release['author_fullname'], db_release['author_name'], db_release['author_email'], ) date = db_to_date( db_release['date'], db_release['date_offset'], db_release['date_neg_utc_offset'] ) return { 'author': author, 'date': date, 'id': db_release['id'], 'name': db_release['name'], 'message': db_release['comment'], 'synthetic': db_release['synthetic'], 'target': db_release['target'], 'target_type': db_release['target_type'], } diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py index 6ce4572f..02d3b0f7 100644 --- a/swh/storage/objstorage/__init__.py +++ b/swh/storage/objstorage/__init__.py @@ -1 +1,4 @@ -from .objstorage import ObjStorage, DIR_MODE, FILE_MODE # NOQA +from .objstorage import ObjStorage +from .objstorage_pathslicing import PathSlicingObjStorage + +__all__ = [ObjStorage, PathSlicingObjStorage] diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py index 8f030865..2daabeec 100644 --- a/swh/storage/objstorage/api/client.py +++ b/swh/storage/objstorage/api/client.py @@ -1,92 +1,103 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import requests from requests.exceptions import ConnectionError from ...exc import StorageAPIError from ...api.common import (decode_response, encode_data_client as encode_data) class RemoteObjStorage(): """ Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: base_url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() def url(self, endpoint): return '%s%s' % (self.base_url, endpoint) def post(self, endpoint, data): try: response = self.session.post( self.url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: print(str(e)) raise StorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def content_add(self, bytes, obj_id=None): """ Add a new object to the object storage. Args: bytes: content of the object to be added to the storage. obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) def content_get(self, obj_id): """ Retrieve the content of a given object. Args: obj_id: The id of the object. Returns: The content of the requested objects as bytes. Raises: ObjNotFoundError: if the requested object is missing """ return self.post('content/get', {'obj_id': obj_id}) + def content_get_random(self, batch_size): + """ Retrieve a random sample of existing content. + + Args: + batch_size: Number of content requested. + + Returns: + A list of random ids that represents existing contents. + """ + return self.post('content/get/random', {'batch_size': batch_size}) + def content_check(self, obj_id): """ Integrity check for a given object verify that the file object is in place, and that the gzipped content matches the object id Args: obj_id: The id of the object. Raises: ObjNotFoundError: if the requested object is missing Error: if the requested object is corrupt """ self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py index cfb3d25d..968ef462 100644 --- a/swh/storage/objstorage/api/server.py +++ b/swh/storage/objstorage/api/server.py @@ -1,89 +1,97 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from flask import Flask, g, request from swh.core import config -from swh.storage.objstorage import ObjStorage +from swh.storage.objstorage import PathSlicingObjStorage from swh.storage.api.common import (BytesRequest, decode_request, error_handler, encode_data_server as encode_data) DEFAULT_CONFIG = { 'storage_base': ('str', '/tmp/swh-storage/objects/'), 'storage_depth': ('int', 3) } app = Flask(__name__) app.request_class = BytesRequest @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): - g.objstorage = ObjStorage(app.config['storage_base'], - app.config['storage_depth']) + g.objstorage = PathSlicingObjStorage(app.config['storage_base'], + app.config['storage_depth'], + slicing=2) @app.route('/') def index(): return "SWH Objstorage API server" @app.route('/content') def content(): return str(list(g.storage)) @app.route('/content/add', methods=['POST']) def add_bytes(): - return encode_data(g.objstorage.add_bytes(**decode_request(request))) + return encode_data(g.objstorage.add(**decode_request(request))) @app.route('/content/get', methods=['POST']) def get_bytes(): - return encode_data(g.objstorage.get_bytes(**decode_request(request))) + return encode_data(g.objstorage.get(**decode_request(request))) + + +@app.route('/content/get/random', methods=['POST']) +def get_random_contents(): + return encode_data( + g.objstorage.get_random(**decode_request(request)) + ) @app.route('/content/check', methods=['POST']) def check(): return encode_data(g.objstorage.check(**decode_request(request))) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration. """ config_path = '/etc/softwareheritage/storage/objstorage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py index 1f78de0c..c651b37c 100644 --- a/swh/storage/objstorage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -1,386 +1,115 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import gzip -import os -import shutil -import tempfile -from contextlib import contextmanager +class ObjStorage(): + """ High-level API to manipulate the Software Heritage object storage. -from ..exc import ObjNotFoundError, Error -from swh.core import hashutil + Conceptually, the object storage offers 5 methods: - -ID_HASH_ALGO = 'sha1' -# ID_HASH_ALGO = 'sha1_git' - -GZIP_BUFSIZ = 1048576 - -DIR_MODE = 0o755 -FILE_MODE = 0o644 - - -def _obj_dir(hex_obj_id, root_dir, depth): - """compute the storage directory of an object - - Args: - hex_obj_id: object id as hexlified string - root_dir: object storage root directory - depth: slicing depth of object IDs in the storage - - see also: `_obj_path` - - """ - if len(hex_obj_id) < depth * 2: - raise ValueError('object id "%s" is too short for slicing at depth %d' - % (hex_obj_id, depth)) - - # compute [depth] substrings of [obj_id], each of length 2, starting from - # the beginning - id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)] - steps = [root_dir] + id_steps - - return os.path.join(*steps) - - -def _obj_path(hex_obj_id, root_dir, depth): - """similar to `obj_dir`, but also include the actual object file name in the - returned path - - """ - return os.path.join(_obj_dir(hex_obj_id, root_dir, depth), hex_obj_id) - - -@contextmanager -def _write_obj_file(hex_obj_id, root_dir, depth): - """context manager for writing object files to the object storage - - During writing data are written to a temporary file, which is atomically - renamed to the right file name after closing. This context manager also - takes care of (gzip) compressing the data on the fly. - - Yields: - a file-like object open for writing bytes - - Sample usage: - - with _write_obj_file(hex_obj_id, root_dir, depth) as f: - f.write(obj_data) - - """ - dir = _obj_dir(hex_obj_id, root_dir, depth) - if not os.path.isdir(dir): - os.makedirs(dir, DIR_MODE, exist_ok=True) - - path = os.path.join(dir, hex_obj_id) - (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', - dir=dir) - tmp_f = os.fdopen(tmp, 'wb') - with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: - yield f - tmp_f.close() - os.chmod(tmp_path, FILE_MODE) - os.rename(tmp_path, path) - - -class ObjStorage: - """high-level API to manipulate the Software Heritage object storage - - Conceptually, the object storage offers 4 methods: - - - add() add a new object, returning an object id - __contains__() check if an object is present, by object id + - add() add a new object, returning an object id + - restore() same as add() but erase an already existed content - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id - Variants of the above methods are implemented by this class, depending on - how the content of an object is specified (bytes, file-like object, etc.). - - On disk, an object storage is a directory tree containing files named after - their object IDs. An object ID is a checksum of its content, depending on - the value of the ID_HASH_ALGO constant (see hashutil for its meaning). + And some management methods: - To avoid directories that contain too many files, the object storage has a - given depth (default: 3). Each depth level consumes two characters of the - object id. So for instance a file with (git) SHA1 of - 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in an object - storage configured at depth 3 at - 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689. - - The actual files in the storage are stored in gzipped compressed format. - - Each file can hence be self-verified (on the shell) with something like: - - actual_id=34973274ccef6ab4dfaaf86599792fa9c3fe4689 - expected_id=$(zcat $filename | sha1sum | cut -f 1 -d' ') - if [ $actual_id != $expected_id ] ; then - echo "AYEEE, invalid object $actual_id /o\" - fi + - get_random() get random object id of existing contents (used for the + content integrity checker). + Each implementation of this interface can have a different behavior and + its own way to store the contents. """ - def __init__(self, root, depth=3): - """create a proxy object to the object storage - - Args: - root: object storage root directory - depth: slicing depth of object IDs in the storage - - """ - if not os.path.isdir(root): - raise ValueError('obj storage root "%s" is not a directory' - % root) - - self._root_dir = root - self._depth = depth - - self._temp_dir = os.path.join(root, 'tmp') - if not os.path.isdir(self._temp_dir): - os.makedirs(self._temp_dir, DIR_MODE, exist_ok=True) + def __contains__(self, *args, **kwargs): + raise NotImplementedError( + "Implementations of ObjStorage must have a '__contains__' method" + ) - def __obj_dir(self, hex_obj_id): - """_obj_dir wrapper using this storage configuration""" - return _obj_dir(hex_obj_id, self._root_dir, self._depth) + def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + """ Add a new object to the object storage. - def __obj_path(self, hex_obj_id): - """_obj_path wrapper using this storage configuration""" - return _obj_path(hex_obj_id, self._root_dir, self._depth) - - def __contains__(self, obj_id): - """check whether a given object id is present in the storage or not - - Return: - True iff the object id is present in the storage + Args: + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + Returns: + the id of the object into the storage. """ - hex_obj_id = hashutil.hash_to_hex(obj_id) + raise NotImplementedError( + "Implementations of ObjStorage must have a 'add' method" + ) - return os.path.exists(_obj_path(hex_obj_id, self._root_dir, - self._depth)) + def restore(self, content, obj_id, *args, **kwargs): + """ Restore a content that have been corrupted. - def add_bytes(self, bytes, obj_id=None): - """add a new object to the object storage + This function is identical to add_bytes but does not check if + the object id is already in the file system. Args: - bytes: content of the object to be added to the storage + content: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. - - """ - if obj_id is None: - # missing checksum, compute it in memory and write to file - h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) - h.update(bytes) - obj_id = h.digest() - - if obj_id in self: - return obj_id - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - # object is either absent, or present but overwrite is requested - with _write_obj_file(hex_obj_id, - root_dir=self._root_dir, - depth=self._depth) as f: - f.write(bytes) - - return obj_id - - def add_file(self, f, length, obj_id=None): - """similar to `add_bytes`, but add the content of file-like object f to the - object storage - - add_file will read the file content only once, and avoid storing all of - it in memory - """ - if obj_id is None: - # unknkown object id: work on temp file, compute checksum as we go, - # mv temp file into place - (tmp, tmp_path) = tempfile.mkstemp(dir=self._temp_dir) - try: - t = os.fdopen(tmp, 'wb') - tz = gzip.GzipFile(fileobj=t) - sums = hashutil._hash_file_obj(f, length, - algorithms=[ID_HASH_ALGO], - chunk_cb=lambda b: tz.write(b)) - tz.close() - t.close() - - obj_id = sums[ID_HASH_ALGO] - if obj_id in self: - return obj_id - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - dir = self.__obj_dir(hex_obj_id) - if not os.path.isdir(dir): - os.makedirs(dir, DIR_MODE, exist_ok=True) - path = os.path.join(dir, hex_obj_id) - - os.chmod(tmp_path, FILE_MODE) - os.rename(tmp_path, path) - finally: - if os.path.exists(tmp_path): - os.unlink(tmp_path) - else: - # known object id: write to .new file, rename - if obj_id in self: - return obj_id - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - with _write_obj_file(hex_obj_id, - root_dir=self._root_dir, - depth=self._depth) as obj: - shutil.copyfileobj(f, obj) - - return obj_id - - @contextmanager - def get_file_obj(self, obj_id): - """context manager to read the content of an object - - Args: - obj_id: object id - - Yields: - a file-like object open for reading (bytes) - - Raises: - ObjNotFoundError: if the requested object is missing - - Sample usage: - - with objstorage.get_file_obj(obj_id) as f: - do_something(f.read()) - - """ - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - path = self.__obj_path(hex_obj_id) - with gzip.GzipFile(path, 'rb') as f: - yield f + raise NotImplemented( + "Implementations of ObjStorage must have a 'restore' method" + ) - def get_bytes(self, obj_id): - """retrieve the content of a given object + def get(self, obj_id, *args, **kwargs): + """ Retrieve the content of a given object. Args: - obj_id: object id + obj_id: object id. Returns: - the content of the requested objects as bytes + the content of the requested object as bytes. Raises: - ObjNotFoundError: if the requested object is missing - - """ - with self.get_file_obj(obj_id) as f: - return f.read() - - def _get_file_path(self, obj_id): - """retrieve the path of a given object in the objects storage - - Note that the path point to a gzip-compressed file, so you need - gzip.open() or equivalent to get the actual object content. - - Args: - obj_id: object id - - Returns: - a file path pointing into the object storage - - Raises: - ObjNotFoundError: if the requested object is missing - + ObjNotFoundError: if the requested object is missing. """ - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) + raise NotImplementedError( + "Implementations of ObjStorage must have a 'get' method" + ) - return self.__obj_path(hex_obj_id) + def check(self, obj_id, *args, **kwargs): + """ Perform an integrity check for a given object. - def check(self, obj_id): - """integrity check for a given object - - verify that the file object is in place, and that the gzipped content - matches the object id + Verify that the file object is in place and that the gziped content + matches the object id. Args: - obj_id: object id + obj_id: object id. Raises: - ObjNotFoundError: if the requested object is missing - Error: if the requested object is corrupt - - """ - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) - - try: - with gzip.open(self.__obj_path(hex_obj_id)) as f: - length = None - if ID_HASH_ALGO.endswith('_git'): - # if the hashing algorithm is git-like, we need to know the - # content size to hash on the fly. Do a first pass here to - # compute the size - length = 0 - while True: - chunk = f.read(GZIP_BUFSIZ) - length += len(chunk) - if not chunk: - break - f.rewind() - - checksums = hashutil._hash_file_obj(f, length, - algorithms=[ID_HASH_ALGO]) - actual_obj_id = checksums[ID_HASH_ALGO] - if obj_id != actual_obj_id: - raise Error('corrupt object %s should have id %s' % - (obj_id, actual_obj_id)) - except (OSError, IOError): - # IOError is for compatibility with older python versions - raise Error('corrupt object %s is not a gzip file' % obj_id) - - def __iter__(self): - """iterate over the object identifiers currently available in the storage - - Warning: with the current implementation of the object storage, this - method will walk the filesystem to list objects, meaning that listing - all objects will be very slow for large storages. You almost certainly - don't want to use this method in production. - - Return: - iterator over object IDs - + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. """ - def obj_iterator(): - # XXX hackish: it does not verify that the depth of found files - # matches the slicing depth of the storage - for root, _dirs, files in os.walk(self._root_dir): - for f in files: - yield bytes.fromhex(f) - - return obj_iterator() + raise NotImplementedError( + "Implementations of ObjStorage must have a 'check' method" + ) - def __len__(self): - """compute the number of objects available in the storage + def get_random(self, batch_size, *args, **kwargs): + """ Get random ids of existing contents - Warning: this currently uses `__iter__`, its warning about bad - performances applies + This method is used in order to get random ids to perform + content integrity verifications on random contents. - Return: - number of objects contained in the storage + Attributes: + batch_size (int): Number of ids that will be given + Yields: + An iterable of ids of contents that are in the current object + storage. """ - return sum(1 for i in self) + raise NotImplementedError( + "The current implementation of ObjStorage does not support " + "'get_random' operation" + ) diff --git a/swh/storage/objstorage/objstorage_pathslicing.py b/swh/storage/objstorage/objstorage_pathslicing.py new file mode 100644 index 00000000..875fd753 --- /dev/null +++ b/swh/storage/objstorage/objstorage_pathslicing.py @@ -0,0 +1,350 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import gzip +import tempfile +import random + +from contextlib import contextmanager + +from swh.core import hashutil + +from .objstorage import ObjStorage +from ..exc import ObjNotFoundError, Error + + +ID_HASH_ALGO = 'sha1' + +GZIP_BUFSIZ = 1048576 + +DIR_MODE = 0o755 +FILE_MODE = 0o644 + + +@contextmanager +def _write_obj_file(hex_obj_id, objstorage): + """ Context manager for writing object files to the object storage. + + During writing, data are written to a temporary file, which is atomically + renamed to the right file name after closing. This context manager also + takes care of (gzip) compressing the data on the fly. + + Usage sample: + with _write_obj_file(hex_obj_id, objstorage): + f.write(obj_data) + + Yields: + a file-like object open for writing bytes. + """ + # Get the final paths and create the directory if absent. + dir = objstorage._obj_dir(hex_obj_id) + if not os.path.isdir(dir): + os.makedirs(dir, DIR_MODE, exist_ok=True) + path = os.path.join(dir, hex_obj_id) + + # Create a temporary file. + (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', + dir=dir) + + # Open the file and yield it for writing. + tmp_f = os.fdopen(tmp, 'wb') + with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: + yield f + + # Then close the temporary file and move it to the right directory. + tmp_f.close() + os.chmod(tmp_path, FILE_MODE) + os.rename(tmp_path, path) + + +@contextmanager +def _read_obj_file(hex_obj_id, objstorage): + """ Context manager for reading object file in the object storage. + + Usage sample: + with _read_obj_file(hex_obj_id, objstorage) as f: + b = f.read() + + Yields: + a file-like object open for reading bytes. + """ + path = objstorage._obj_path(hex_obj_id) + with gzip.GzipFile(path, 'rb') as f: + yield f + + +class PathSlicingObjStorage(ObjStorage): + """ Implementation of the ObjStorage API based on the hash of the content. + + On disk, an object storage is a directory tree containing files named after + their object IDs. An object ID is a checksum of its content, depending on + the value of the ID_HASH_ALGO constant (see hashutil for its meaning). + + To avoid directories that contain too many files, the object storage has a + given depth. Each depth level consumes a given amount of characters of + the object id. + + So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 + will be stored in the given object storages : + + - depth=3, slicing=2 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + - depth=1, slicing=5 : 34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + + The files in the storage are stored in gzipped compressed format. + + Attributes: + root (string): path to the root directory of the storage on the disk. + depth (int): number of subdirectories created to store a file. + slicing (int): number of hash character consumed for each + subdirectories. + """ + + def __init__(self, root, depth, slicing): + """ Create an object to access a hash-slicing based object storage. + + Args: + root (string): path to the root directory of the storage on + the disk. + depth (int): number of subdirectories created to store a file. + slicing (int): number of hash character consumed for each + subdirectories. + """ + if not os.path.isdir(root): + raise ValueError( + 'PathSlicingObjStorage root "%s" is not a directory' % root + ) + + self.root = root + self.depth = depth + self.slicing = slicing + + def __contains__(self, obj_id): + """ Check whether the given object is present in the storage or not. + + Returns: + True iff the object is present in the storage. + """ + hex_obj_id = hashutil.hash_to_hex(obj_id) + return os.path.exists(self._obj_path(hex_obj_id)) + + def __iter__(self): + """iterate over the object identifiers currently available in the storage + + Warning: with the current implementation of the object storage, this + method will walk the filesystem to list objects, meaning that listing + all objects will be very slow for large storages. You almost certainly + don't want to use this method in production. + + Return: + iterator over object IDs + """ + def obj_iterator(): + # XXX hackish: it does not verify that the depth of found files + # matches the slicing depth of the storage + for root, _dirs, files in os.walk(self.root): + for f in files: + yield bytes.fromhex(f) + + return obj_iterator() + + def __len__(self): + """compute the number of objects available in the storage + + Warning: this currently uses `__iter__`, its warning about bad + performances applies + + Return: + number of objects contained in the storage + + """ + return sum(1 for i in self) + + def _obj_dir(self, hex_obj_id): + """ Compute the storage directory of an object. + + See also: PathSlicingObjStorage::_obj_path + + Args: + hex_obj_id: object id as hexlified string. + + Returns: + Path to the directory that contains the required object. + """ + if len(hex_obj_id) < self.depth * self.slicing: + raise ValueError( + 'Object id "%s" is to short for %d-slicing at depth %d' + % (hex_obj_id, self.slicing, self.depth) + ) + + # Compute [depth] substrings of [hex_obj_id], each of length [slicing], + # starting from the beginning. + id_steps = [hex_obj_id[i * self.slicing: + i * self.slicing + self.slicing] + for i in range(self.depth)] + steps = [self.root] + id_steps + + return os.path.join(*steps) + + def _obj_path(self, hex_obj_id): + """ Compute the full path to an object into the current storage. + + See also: PathSlicingObjStorage::_obj_dir + + Args: + hex_obj_id: object id as hexlified string. + + Returns: + Path to the actual object corresponding to the given id. + """ + return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id) + + def add(self, bytes, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + if obj_id is None: + # Checksum is missing, compute it on the fly. + h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) + h.update(bytes) + obj_id = h.digest() + + if check_presence and obj_id in self: + # If the object is already present, return immediatly. + return obj_id + + hex_obj_id = hashutil.hash_to_hex(obj_id) + with _write_obj_file(hex_obj_id, self) as f: + f.write(bytes) + + return obj_id + + def restore(self, bytes, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + bytes: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + """ + return self.add(bytes, obj_id, check_presence=False) + + def get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + if obj_id not in self: + raise ObjNotFoundError(obj_id) + + # Open the file and return its content as bytes + hex_obj_id = hashutil.hash_to_hex(obj_id) + with _read_obj_file(hex_obj_id, self) as f: + return f.read() + + def check(self, obj_id): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + if obj_id not in self: + raise ObjNotFoundError(obj_id) + + hex_obj_id = hashutil.hash_to_hex(obj_id) + + try: + with gzip.open(self._obj_path(hex_obj_id)) as f: + length = None + if ID_HASH_ALGO.endswith('_git'): + # if the hashing algorithm is git-like, we need to know the + # content size to hash on the fly. Do a first pass here to + # compute the size + length = 0 + while True: + chunk = f.read(GZIP_BUFSIZ) + length += len(chunk) + if not chunk: + break + f.rewind() + + checksums = hashutil._hash_file_obj(f, length, + algorithms=[ID_HASH_ALGO]) + actual_obj_id = checksums[ID_HASH_ALGO] + if obj_id != actual_obj_id: + raise Error( + 'Corrupt object %s should have id %s' + % (hashutil.hash_to_hex(obj_id), + hashutil.hash_to_hex(actual_obj_id)) + ) + except (OSError, IOError): + # IOError is for compatibility with older python versions + raise Error('Corrupt object %s is not a gzip file' % obj_id) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + def get_random_content(self, batch_size): + """ Get a batch of content inside a single directory. + + Returns: + a tuple (batch size, batch). + """ + dirs = [] + for level in range(self.depth): + path = os.path.join(self.root, *dirs) + dir_list = next(os.walk(path))[1] + if 'tmp' in dir_list: + dir_list.remove('tmp') + dirs.append(random.choice(dir_list)) + + path = os.path.join(self.root, *dirs) + content_list = next(os.walk(path))[2] + length = min(batch_size, len(content_list)) + return length, map(hashutil.hex_to_hash, + random.sample(content_list, length)) + + while batch_size: + length, it = get_random_content(self, batch_size) + batch_size = batch_size - length + yield from it diff --git a/swh/storage/storage.py b/swh/storage/storage.py index b5d1f5ff..3fb8cc6a 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1,1078 +1,1078 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import functools import itertools import dateutil.parser import psycopg2 from . import converters from .db import Db -from .objstorage import ObjStorage +from .objstorage import PathSlicingObjStorage from .exc import ObjNotFoundError, StorageDBError from swh.core.hashutil import ALGORITHMS # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 def db_transaction(meth): """decorator to execute Storage methods within DB transactions The decorated method must accept a `cur` keyword argument """ @functools.wraps(meth) def _meth(self, *args, **kwargs): with self.db.transaction() as cur: return meth(self, *args, cur=cur, **kwargs) return _meth def db_transaction_generator(meth): """decorator to execute Storage methods within DB transactions, while returning a generator The decorated method must accept a `cur` keyword argument """ @functools.wraps(meth) def _meth(self, *args, **kwargs): with self.db.transaction() as cur: yield from meth(self, *args, cur=cur, **kwargs) return _meth class Storage(): """SWH storage proxy, encompassing DB and object storage """ def __init__(self, db_conn, obj_root): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection obj_root: path to the root of the object storage """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = Db(db_conn) else: self.db = Db.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) - self.objstorage = ObjStorage(obj_root) + self.objstorage = PathSlicingObjStorage(obj_root, depth=3, slicing=2) def content_add(self, content): """Add content blobs to the storage Note: in case of DB errors, objects might have already been added to the object storage and will not be removed. Since addition to the object storage is idempotent, that should not be a problem. Args: content: iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length (default: -1) - one key for each checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to the corresponding checksum - status (str): one of visible, hidden, absent - reason (str): if status = absent, the reason why - origin (int): if status = absent, the origin we saw the content in """ db = self.db content_by_status = defaultdict(list) for d in content: if 'status' not in d: d['status'] = 'visible' if 'length' not in d: d['length'] = -1 content_by_status[d['status']].append(d) content_with_data = content_by_status['visible'] content_without_data = content_by_status['absent'] missing_content = set(self.content_missing(content_with_data)) missing_skipped = set( sha1_git for sha1, sha1_git, sha256 in self.skipped_content_missing(content_without_data)) with db.transaction() as cur: if missing_content: # create temporary table for metadata injection db.mktemp('content', cur) def add_to_objstorage(cont): - self.objstorage.add_bytes(cont['data'], - obj_id=cont['sha1']) + self.objstorage.add(cont['data'], + obj_id=cont['sha1']) content_filtered = (cont for cont in content_with_data if cont['sha1'] in missing_content) db.copy_to(content_filtered, 'tmp_content', ['sha1', 'sha1_git', 'sha256', 'length', 'status'], cur, item_cb=add_to_objstorage) # move metadata in place db.content_add_from_temp(cur) if missing_skipped: missing_filtered = (cont for cont in content_without_data if cont['sha1_git'] in missing_skipped) db.mktemp('skipped_content', cur) db.copy_to(missing_filtered, 'tmp_skipped_content', ['sha1', 'sha1_git', 'sha256', 'length', 'reason', 'status', 'origin'], cur) # move metadata in place db.skipped_content_add_from_temp(cur) def content_get(self, content): """Retrieve in bulk contents and their data. Args: content: iterables of sha1 Returns: Generates streams of contents as dict with their raw data: - sha1: sha1's content - data: bytes data of the content Raises: ValueError in case of too much contents are required. cf. BULK_BLOCK_CONTENT_LEN_MAX """ # FIXME: Improve on server module to slice the result if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise ValueError( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) for obj_id in content: try: - data = self.objstorage.get_bytes(obj_id) + data = self.objstorage.get(obj_id) except ObjNotFoundError: yield None continue yield {'sha1': obj_id, 'data': data} @db_transaction_generator def content_missing(self, content, key_hash='sha1', cur=None): """List content missing from storage Args: content: iterable of dictionaries containing one key for each checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to the corresponding checksum, and a length key mapped to the content length. key_hash: the name of the hash used as key (default: 'sha1') Returns: an iterable of `key_hash`es missing from the storage Raises: TODO: an exception when we get a hash collision. """ db = self.db keys = ['sha1', 'sha1_git', 'sha256'] if key_hash not in keys: raise ValueError("key_hash should be one of %s" % keys) key_hash_idx = keys.index(key_hash) # Create temporary table for metadata injection db.mktemp('content', cur) db.copy_to(content, 'tmp_content', keys + ['length'], cur) for obj in db.content_missing_from_temp(cur): yield obj[key_hash_idx] @db_transaction_generator def content_missing_per_sha1(self, contents, cur=None): """List content missing from storage based only on sha1. Args: contents: Iterable of sha1 to check for absence. Returns: an iterable of `sha1`s missing from the storage. Raises: TODO: an exception when we get a hash collision. """ db = self.db db.store_tmp_bytea(contents, cur) for obj in db.content_missing_per_sha1_from_temp(cur): yield obj[0] @db_transaction_generator def skipped_content_missing(self, content, cur=None): """List skipped_content missing from storage Args: content: iterable of dictionaries containing the data for each checksum algorithm. Returns: an iterable of signatures missing from the storage """ keys = ['sha1', 'sha1_git', 'sha256'] db = self.db db.mktemp('skipped_content', cur) db.copy_to(content, 'tmp_skipped_content', keys + ['length', 'reason'], cur) yield from db.skipped_content_missing_from_temp(cur) @db_transaction def content_find(self, content, cur=None): """Find a content hash in db. Args: content: a dictionary representing one content hash, mapping checksum algorithm names (see swh.core.hashutil.ALGORITHMS) to checksum values Returns: a triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. Raises: ValueError in case the key of the dictionary is not sha1, sha1_git nor sha256. """ db = self.db if not set(content).intersection(ALGORITHMS): raise ValueError('content keys must contain at least one of: ' 'sha1, sha1_git, sha256') c = db.content_find(sha1=content.get('sha1'), sha1_git=content.get('sha1_git'), sha256=content.get('sha256'), cur=cur) if c: keys = ['sha1', 'sha1_git', 'sha256', 'length', 'ctime', 'status'] return dict(zip(keys, c)) return None @db_transaction def content_find_occurrence(self, content, cur=None): """Find the content's occurrence. Args: content: a dictionary entry representing one content hash. The dictionary key is one of swh.core.hashutil.ALGORITHMS. The value mapped to the corresponding checksum. Returns: The occurrence of the content. Raises: ValueError in case the key of the dictionary is not sha1, sha1_git nor sha256. """ db = self.db c = self.content_find(content) if not c: return None sha1 = c['sha1'] found_occ = db.content_find_occurrence(sha1, cur=cur) if found_occ: keys = ['origin_type', 'origin_url', 'branch', 'target', 'target_type', 'path'] return dict(zip(keys, found_occ)) return None def directory_add(self, directories): """Add directories to the storage Args: directories: iterable of dictionaries representing the individual directories to add. Each dict has the following keys: - id (sha1_git): the id of the directory to add - entries (list): list of dicts for each entry in the directory. Each dict has the following keys: - name (bytes) - type (one of 'file', 'dir', 'rev'): type of the directory entry (file, directory, revision) - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions """ dirs = set() dir_entries = { 'file': defaultdict(list), 'dir': defaultdict(list), 'rev': defaultdict(list), } for cur_dir in directories: dir_id = cur_dir['id'] dirs.add(dir_id) for src_entry in cur_dir['entries']: entry = src_entry.copy() entry['dir_id'] = dir_id dir_entries[entry['type']][dir_id].append(entry) dirs_missing = set(self.directory_missing(dirs)) if not dirs_missing: return db = self.db with db.transaction() as cur: # Copy directory ids dirs_missing_dict = ({'id': dir} for dir in dirs_missing) db.mktemp('directory', cur) db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) # Copy entries for entry_type, entry_list in dir_entries.items(): entries = itertools.chain.from_iterable( entries_for_dir for dir_id, entries_for_dir in entry_list.items() if dir_id in dirs_missing) db.mktemp_dir_entry(entry_type) db.copy_to( entries, 'tmp_directory_entry_%s' % entry_type, ['target', 'name', 'perms', 'dir_id'], cur, ) # Do the final copy db.directory_add_from_temp(cur) @db_transaction_generator def directory_missing(self, directories, cur): """List directories missing from storage Args: an iterable of directory ids Returns: a list of missing directory ids """ db = self.db # Create temporary table for metadata injection db.mktemp('directory', cur) directories_dicts = ({'id': dir} for dir in directories) db.copy_to(directories_dicts, 'tmp_directory', ['id'], cur) for obj in db.directory_missing_from_temp(cur): yield obj[0] @db_transaction_generator def directory_get(self, directories, cur=None): """Get information on directories. Args: - directories: an iterable of directory ids Returns: List of directories as dict with keys and associated values. """ db = self.db keys = ('id', 'dir_entries', 'file_entries', 'rev_entries') db.mktemp('directory', cur) db.copy_to(({'id': dir_id} for dir_id in directories), 'tmp_directory', ['id'], cur) dirs = db.directory_get_from_temp(cur) for line in dirs: yield dict(zip(keys, line)) @db_transaction_generator def directory_ls(self, directory, recursive=False, cur=None): """Get entries for one directory. Args: - directory: the directory to list entries from. - recursive: if flag on, this list recursively from this directory. Returns: List of entries for such directory. """ db = self.db keys = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256'] if recursive: res_gen = db.directory_walk(directory) else: res_gen = db.directory_walk_one(directory) for line in res_gen: yield dict(zip(keys, line)) @db_transaction def directory_entry_get_by_path(self, directory, paths, cur=None): """Get the directory entry (either file or dir) from directory with path. Args: - directory: sha1 of the top level directory - paths: path to lookup from the top level directory. From left (top) to right (bottom). Returns: The corresponding directory entry if found, None otherwise. """ db = self.db keys = ('dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256') res = db.directory_entry_get_by_path(directory, paths, cur) if res: return dict(zip(keys, res)) def revision_add(self, revisions): """Add revisions to the storage Args: revisions: iterable of dictionaries representing the individual revisions to add. Each dict has the following keys: - id (sha1_git): id of the revision to add - date (datetime.DateTime): date the revision was written - date_offset (int): offset from UTC in minutes the revision was written - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - committer_date (datetime.DateTime): date the revision got added to the origin - committer_date_offset (int): offset from UTC in minutes the revision was added to the origin - committer_date_neg_utc_offset (boolean): whether a null committer_date_offset represents a negative UTC offset - type (one of 'git', 'tar'): type of the revision added - directory (sha1_git): the directory the revision points at - message (bytes): the message associated with the revision - author_name (bytes): the name of the revision author - author_email (bytes): the email of the revision author - committer_name (bytes): the name of the revision committer - committer_email (bytes): the email of the revision committer - metadata (jsonb): extra information as dictionary - synthetic (bool): revision's nature (tarball, directory creates synthetic revision) - parents (list of sha1_git): the parents of this revision """ db = self.db revisions_missing = set(self.revision_missing( set(revision['id'] for revision in revisions))) if not revisions_missing: return with db.transaction() as cur: db.mktemp_revision(cur) revisions_filtered = ( converters.revision_to_db(revision) for revision in revisions if revision['id'] in revisions_missing) parents_filtered = [] db.copy_to( revisions_filtered, 'tmp_revision', db.revision_add_cols, cur, lambda rev: parents_filtered.extend(rev['parents'])) db.revision_add_from_temp(cur) db.copy_to(parents_filtered, 'revision_history', ['id', 'parent_id', 'parent_rank'], cur) @db_transaction_generator def revision_missing(self, revisions, cur=None): """List revisions missing from storage Args: an iterable of revision ids Returns: a list of missing revision ids """ db = self.db db.store_tmp_bytea(revisions, cur) for obj in db.revision_missing_from_temp(cur): yield obj[0] @db_transaction_generator def revision_get(self, revisions, cur): """Get all revisions from storage Args: an iterable of revision ids Returns: an iterable of revisions as dictionaries (or None if the revision doesn't exist) """ db = self.db db.store_tmp_bytea(revisions, cur) for line in self.db.revision_get_from_temp(cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator def revision_log(self, revisions, limit=None, cur=None): """Fetch revision entry from the given root revisions. Args: - revisions: array of root revision to lookup - limit: limitation on the output result. Default to null. Yields: List of revision log from such revisions root. """ db = self.db for line in db.revision_log(revisions, limit, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator def revision_shortlog(self, revisions, limit=None, cur=None): """Fetch the shortlog for the given revisions Args: revisions: list of root revisions to lookup limit: depth limitation for the output Yields: a list of (id, parents) tuples. """ db = self.db yield from db.revision_shortlog(revisions, limit, cur) @db_transaction_generator def revision_log_by(self, origin_id, limit=None, cur=None): """Fetch revision entry from the actual origin_id's latest revision. """ db = self.db for line in db.revision_log_by(origin_id, limit, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data def release_add(self, releases): """Add releases to the storage Args: releases: iterable of dictionaries representing the individual releases to add. Each dict has the following keys: - id (sha1_git): id of the release to add - revision (sha1_git): id of the revision the release points to - date (datetime.DateTime): the date the release was made - date_offset (int): offset from UTC in minutes the release was made - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - name (bytes): the name of the release - comment (bytes): the comment associated with the release - author_name (bytes): the name of the release author - author_email (bytes): the email of the release author """ db = self.db release_ids = set(release['id'] for release in releases) releases_missing = set(self.release_missing(release_ids)) if not releases_missing: return with db.transaction() as cur: db.mktemp_release(cur) releases_filtered = ( converters.release_to_db(release) for release in releases if release['id'] in releases_missing ) db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, cur) db.release_add_from_temp(cur) @db_transaction_generator def release_missing(self, releases, cur=None): """List releases missing from storage Args: an iterable of release ids Returns: a list of missing release ids """ db = self.db # Create temporary table for metadata injection db.store_tmp_bytea(releases, cur) for obj in db.release_missing_from_temp(cur): yield obj[0] @db_transaction_generator def release_get(self, releases, cur=None): """Given a list of sha1, return the releases's information Args: releases: list of sha1s Returns: Generates the list of releases dict with the following keys: - id: origin's id - revision: origin's type - url: origin's url - lister: lister's uuid - project: project's uuid (FIXME, retrieve this information) Raises: ValueError if the keys does not match (url and type) nor id. """ db = self.db # Create temporary table for metadata injection db.store_tmp_bytea(releases, cur) for release in db.release_get_from_temp(cur): yield converters.db_to_release( dict(zip(db.release_get_cols, release)) ) @db_transaction def occurrence_add(self, occurrences, cur=None): """Add occurrences to the storage Args: occurrences: iterable of dictionaries representing the individual occurrences to add. Each dict has the following keys: - origin (int): id of the origin corresponding to the occurrence - branch (str): the reference name of the occurrence - target (sha1_git): the id of the object pointed to by the occurrence - target_type (str): the type of object pointed to by the occurrence - date (datetime.DateTime): the validity date for the given occurrence """ db = self.db processed = [] for occurrence in occurrences: if isinstance(occurrence['date'], str): occurrence['date'] = dateutil.parser.parse(occurrence['date']) processed.append(occurrence) db.mktemp_occurrence_history(cur) db.copy_to(processed, 'tmp_occurrence_history', ['origin', 'branch', 'target', 'target_type', 'date'], cur) db.occurrence_history_add_from_temp(cur) @db_transaction_generator def occurrence_get(self, origin_id, cur=None): """Retrieve occurrence information per origin_id. Args: origin_id: The occurrence's origin. Yields: List of occurrences matching criterion. """ db = self.db for line in db.occurrence_get(origin_id, cur): yield { 'origin': line[0], 'branch': line[1], 'target': line[2], 'target_type': line[3], } @db_transaction_generator def revision_get_by(self, origin_id, branch_name=None, timestamp=None, limit=None, cur=None): """Given an origin_id, retrieve occurrences' list per given criterions. Args: origin_id: The origin to filter on. branch_name: optional branch name. timestamp: limit: Yields: List of occurrences matching the criterions or None if nothing is found. """ for line in self.db.revision_get_by(origin_id, branch_name, timestamp, limit=limit, cur=cur): data = converters.db_to_revision( dict(zip(self.db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data def release_get_by(self, origin_id, limit=None): """Given an origin id, return all the tag objects pointing to heads of origin_id. Args: origin_id: the origin to filter on. limit: None by default Yields: List of releases matching the criterions or None if nothing is found. """ for line in self.db.release_get_by(origin_id, limit=limit): data = converters.db_to_release( dict(zip(self.db.release_get_cols, line)) ) yield data @db_transaction def object_find_by_sha1_git(self, ids, cur=None): """Return the objects found with the given ids. Args: ids: a generator of sha1_gits Returns: a dict mapping the id to the list of objects found. Each object found is itself a dict with keys: sha1_git: the input id type: the type of object found id: the id of the object found object_id: the numeric id of the object found. """ db = self.db ret = {id: [] for id in ids} for retval in db.object_find_by_sha1_git(ids): if retval[1]: ret[retval[0]].append(dict(zip(db.object_find_by_sha1_git_cols, retval))) return ret @db_transaction def origin_get(self, origin, cur=None): """Return the origin either identified by its id or its tuple (type, url). Args: origin: dictionary representing the individual origin to find. This dict has either the keys type and url: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to either the id: - id: the origin id Returns: the origin dict with the keys: - id: origin's id - type: origin's type - url: origin's url - lister: lister's uuid - project: project's uuid (FIXME, retrieve this information) Raises: ValueError if the keys does not match (url and type) nor id. """ db = self.db keys = ['id', 'type', 'url', 'lister', 'project'] origin_id = origin.get('id') if origin_id: # check lookup per id first ori = db.origin_get(origin_id, cur) elif 'type' in origin and 'url' in origin: # or lookup per type, url ori = db.origin_get_with(origin['type'], origin['url'], cur) else: # unsupported lookup raise ValueError('Origin must have either id or (type and url).') if ori: return dict(zip(keys, ori)) return None @db_transaction def _person_add(self, person, cur=None): """Add a person in storage. BEWARE: Internal function for now. Do not do anything fancy in case a person already exists. Please adapt code if more checks are needed. Args: person dictionary with keys name and email. Returns: Id of the new person. """ db = self.db return db.person_add(person) @db_transaction_generator def person_get(self, person, cur=None): """Return the persons identified by their ids. Args: person: array of ids. Returns: The array of persons corresponding of the ids. """ db = self.db for person in db.person_get(person): yield dict(zip(db.person_get_cols, person)) @db_transaction def origin_add_one(self, origin, cur=None): """Add origin to the storage Args: origin: dictionary representing the individual origin to add. This dict has the following keys: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to Returns: the id of the added origin, or of the identical one that already exists. """ db = self.db data = db.origin_get_with(origin['type'], origin['url'], cur) if data: return data[0] return db.origin_add(origin['type'], origin['url'], cur) @db_transaction def fetch_history_start(self, origin_id, cur=None): """Add an entry for origin origin_id in fetch_history. Returns the id of the added fetch_history entry """ fetch_history = { 'origin': origin_id, 'date': datetime.datetime.now(tz=datetime.timezone.utc), } return self.db.create_fetch_history(fetch_history, cur) @db_transaction def fetch_history_end(self, fetch_history_id, data, cur=None): """Close the fetch_history entry with id `fetch_history_id`, replacing its data with `data`. """ now = datetime.datetime.now(tz=datetime.timezone.utc) fetch_history = self.db.get_fetch_history(fetch_history_id, cur) if not fetch_history: raise ValueError('No fetch_history with id %d' % fetch_history_id) fetch_history['duration'] = now - fetch_history['date'] fetch_history.update(data) self.db.update_fetch_history(fetch_history, cur) @db_transaction def fetch_history_get(self, fetch_history_id, cur=None): """Get the fetch_history entry with id `fetch_history_id`. """ return self.db.get_fetch_history(fetch_history_id, cur) @db_transaction def entity_add(self, entities, cur=None): """Add the given entitites to the database (in entity_history). Args: - entities: iterable of dictionaries containing the following keys: - uuid (uuid): id of the entity - parent (uuid): id of the parent entity - name (str): name of the entity - type (str): type of entity (one of 'organization', 'group_of_entities', 'hosting', 'group_of_persons', 'person', 'project') - description (str, optional): description of the entity - homepage (str): url of the entity's homepage - active (bool): whether the entity is active - generated (bool): whether the entity was generated - lister_metadata (dict): lister-specific entity metadata - metadata (dict): other metadata for the entity - validity (datetime.DateTime array): timestamps at which we listed the entity. """ db = self.db cols = list(db.entity_history_cols) cols.remove('id') db.mktemp_entity_history() db.copy_to(entities, 'tmp_entity_history', cols, cur) db.entity_history_add_from_temp() @db_transaction_generator def entity_get_from_lister_metadata(self, entities, cur=None): """Fetch entities from the database, matching with the lister and associated metadata. Args: entities: iterable of dictionaries containing the lister metadata to look for. Useful keys are 'lister', 'type', 'id', ... Returns: A generator of fetched entities with all their attributes. If no match was found, the returned entity is None. """ db = self.db db.mktemp_entity_lister(cur) mapped_entities = [] for i, entity in enumerate(entities): mapped_entity = { 'id': i, 'lister_metadata': entity, } mapped_entities.append(mapped_entity) db.copy_to(mapped_entities, 'tmp_entity_lister', ['id', 'lister_metadata'], cur) cur.execute('''select id, %s from swh_entity_from_tmp_entity_lister() order by id''' % ','.join(db.entity_cols)) for id, *entity_vals in cur: fetched_entity = dict(zip(db.entity_cols, entity_vals)) if fetched_entity['uuid']: yield fetched_entity else: yield { 'uuid': None, 'lister_metadata': entities[i], } @db_transaction_generator def entity_get(self, uuid, cur=None): """Returns the list of entity per its uuid identifier and also its parent hierarchy. Args: uuid: entity's identifier Returns: List of entities starting with entity with uuid and the parent hierarchy from such entity. """ db = self.db for entity in db.entity_get(uuid, cur): yield dict(zip(db.entity_cols, entity)) @db_transaction def entity_get_one(self, uuid, cur=None): """Returns one entity using its uuid identifier. Args: uuid: entity's identifier Returns: the object corresponding to the given entity """ db = self.db entity = db.entity_get_one(uuid, cur) if entity: return dict(zip(db.entity_cols, entity)) else: return None @db_transaction def stat_counters(self, cur=None): """compute statistics about the number of tuples in various tables Returns: a dictionary mapping textual labels (e.g., content) to integer values (e.g., the number of tuples in table content) """ return {k: v for (k, v) in self.db.stat_counters()} diff --git a/swh/storage/tests/objstorage_testing.py b/swh/storage/tests/objstorage_testing.py new file mode 100644 index 00000000..79c3587e --- /dev/null +++ b/swh/storage/tests/objstorage_testing.py @@ -0,0 +1,70 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from nose.tools import istest + +from swh.core import hashutil +from swh.storage import exc + + +class ObjStorageTestFixture(): + + def setUp(self): + super().setUp() + + def hash_content(self, content): + obj_id = hashutil.hashdata(content)['sha1'] + return content, obj_id + + def assertContentMatch(self, obj_id, expected_content): + content = self.storage.get(obj_id) + self.assertEqual(content, expected_content) + + @istest + def add_get_w_id(self): + content, obj_id = self.hash_content(b'add_get_w_id') + r = self.storage.add(content, obj_id=obj_id) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + + @istest + def add_get_wo_id(self): + content, obj_id = self.hash_content(b'add_get_wo_id') + r = self.storage.add(content) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + + @istest + def restore_content(self): + valid_content, valid_obj_id = self.hash_content(b'restore_content') + invalid_content = b'unexpected content' + id_adding = self.storage.add(invalid_content, valid_obj_id) + id_restore = self.storage.restore(valid_content) + # Adding a false content then restore it to the right one and + # then perform a verification should result in a successful check. + self.assertEqual(id_adding, valid_obj_id) + self.assertEqual(id_restore, valid_obj_id) + self.assertContentMatch(valid_obj_id, valid_content) + + @istest + def get_missing(self): + content, obj_id = self.hash_content(b'get_missing') + with self.assertRaises(exc.Error): + self.storage.get(obj_id) + + @istest + def check_missing(self): + content, obj_id = self.hash_content(b'check_missing') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def check_present(self): + content, obj_id = self.hash_content(b'check_missing') + self.storage.add(content) + try: + self.storage.check(obj_id) + except: + self.fail('Integrity check failed') diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py index cf05270d..0db83a2e 100644 --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,246 +1,245 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest import os from nose.tools import istest from nose.plugins.attrib import attr from datetime import datetime, timedelta from swh.core import hashutil from swh.core.tests.db_testing import DbTestFixture from server_testing import ServerTestFixture from swh.storage import Storage from swh.storage.exc import ObjNotFoundError from swh.storage.archiver import ArchiverDirector, ArchiverWorker from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class TestArchiver(DbTestFixture, ServerTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') def setUp(self): # Launch the backup server self.backup_objroot = tempfile.mkdtemp(prefix='remote') self.config = {'storage_base': self.backup_objroot, 'storage_depth': 3} self.app = app super().setUp() # Launch a client to check objects presence - print("url", self.url()) self.remote_objstorage = RemoteObjStorage(self.url()) # Create the local storage. self.objroot = tempfile.mkdtemp(prefix='local') self.storage = Storage(self.conn, self.objroot) # Initializes and fill the tables. self.initialize_tables() # Create the archiver self.archiver = self.__create_director() self.storage_data = ('Local', 'http://localhost:%s/' % self.port) def tearDown(self): self.empty_tables() super().tearDown() def initialize_tables(self): """ Initializes the database with a sample of items. """ # Add an archive self.cursor.execute("""INSERT INTO archives(id, url) VALUES('Local', 'http://localhost:{}/') """.format(self.port)) self.conn.commit() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content_archive') self.cursor.execute('DELETE FROM archives') self.conn.commit() def __add_content(self, content_data, status='missing', date='now()'): # Add the content content = hashutil.hashdata(content_data) content.update({'data': content_data}) self.storage.content_add([content]) # Then update database content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) self.cursor.execute("""INSERT INTO content_archive VALUES('%s'::sha1, 'Local', '%s', %s) """ % (content_id, status, date)) return content['sha1'] def __get_missing(self): self.cursor.execute("""SELECT content_id FROM content_archive WHERE status='missing'""") return self.cursor.fetchall() def __create_director(self, batch_size=5000, archival_max_age=3600, retention_policy=1, asynchronous=False): config = { 'objstorage_path': self.objroot, 'batch_max_size': batch_size, 'archival_max_age': archival_max_age, 'retention_policy': retention_policy, 'asynchronous': asynchronous # Avoid depending on queue for tests. } director = ArchiverDirector(self.conn, config) return director def __create_worker(self, batch={}, config={}): mstorage_args = [self.archiver.master_storage.db.conn, self.objroot] slaves = [self.storage_data] if not config: config = self.archiver.config return ArchiverWorker(batch, mstorage_args, slaves, config) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ content_data = b'archive_missing_content' id = self.__add_content(content_data) # After the run, the content should be in the archive. self.archiver.run() remote_data = self.remote_objstorage.content_get(id) self.assertEquals(content_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ id = self.__add_content(b'archive_present_content', status='present') # After the run, the content should NOT be in the archive.* self.archiver.run() with self.assertRaises(ObjNotFoundError): self.remote_objstorage.content_get(id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ id = self.__add_content(b'archive_alread_enough') director = self.__create_director(retention_policy=0) director.run() with self.assertRaises(ObjNotFoundError): self.remote_objstorage.content_get(id) # Unit test for ArchiverDirector def vstatus(self, status, mtime): return self.archiver.get_virtual_status(status, mtime) @istest def vstatus_present(self): self.assertEquals( self.vstatus('present', None), 'present' ) @istest def vstatus_missing(self): self.assertEquals( self.vstatus('missing', None), 'missing' ) @istest def vstatus_ongoing_remaining(self): current_time = datetime.now() self.assertEquals( self.vstatus('ongoing', current_time), 'present' ) @istest def vstatus_ongoing_elapsed(self): past_time = datetime.now() - timedelta( seconds=self.archiver.config['archival_max_age'] + 1 ) self.assertEquals( self.vstatus('ongoing', past_time), 'missing' ) # Unit tests for archive worker @istest def need_archival_missing(self): """ A content should still need archival when it is missing. """ id = self.__add_content(b'need_archival_missing', status='missing') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest def need_archival_present(self): """ A content should still need archival when it is missing """ id = self.__add_content(b'need_archival_missing', status='present') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @istest def need_archival_ongoing_remaining(self): """ An ongoing archival with remaining time shouldnt need archival. """ id = self.__add_content(b'need_archival_ongoing_remaining', status='ongoing', date="'%s'" % datetime.now()) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @istest def need_archival_ongoing_elasped(self): """ An ongoing archival with elapsed time should be scheduled again. """ id = self.__add_content( b'archive_ongoing_elapsed', status='ongoing', date="'%s'" % (datetime.now() - timedelta( seconds=self.archiver.config['archival_max_age'] + 1 )) ) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest def content_sorting_by_archiver(self): """ Check that the content is correctly sorted. """ batch = { 'id1': { 'present': [('slave1', 'slave1_url')], 'missing': [] }, 'id2': { 'present': [], 'missing': [('slave1', 'slave1_url')] } } worker = self.__create_worker(batch=batch) mapping = worker.sort_content_by_archive() self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) self.assertIn('id2', mapping[('slave1', 'slave1_url')]) diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py new file mode 100644 index 00000000..95e96a1f --- /dev/null +++ b/swh/storage/tests/test_checker.py @@ -0,0 +1,128 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import gzip +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.checker.checker import ContentChecker + + +class MockBackupStorage(): + + def __init__(self): + self.values = {} + + def content_add(self, id, value): + self.values[id] = value + + def content_get(self, ids): + for id in ids: + try: + data = self.values[id] + except KeyError: + yield None + continue + + yield {'sha1': id, 'data': data} + + +@attr('fs') +class TestChecker(unittest.TestCase): + """ Test the content integrity checker + """ + + def setUp(self): + super().setUp() + # Connect to an objstorage + config = {'batch_size': 10} + path = tempfile.mkdtemp() + depth = 3 + self.checker = ContentChecker(config, path, depth, 'http://None') + self.checker.backup_storages = [MockBackupStorage(), + MockBackupStorage()] + + def corrupt_content(self, id): + """ Make the given content invalid. + """ + hex_id = hashutil.hash_to_hex(id) + file_path = self.checker.objstorage._obj_path(hex_id) + with gzip.open(file_path, 'wb') as f: + f.write(b'Unexpected content') + + @istest + def check_valid_content(self): + # Check that a valid content is valid. + content = b'check_valid_content' + id = self.checker.objstorage.add(content) + self.assertTrue(self.checker.check_content(id)) + + @istest + def check_invalid_content(self): + # Check that an invalid content is noticed. + content = b'check_invalid_content' + id = self.checker.objstorage.add(content) + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + + @istest + def repair_content_present_first(self): + # Try to repair a content that is in the backup storage. + content = b'repair_content_present_first' + id = self.checker.objstorage.add(content) + # Add a content to the mock + self.checker.backup_storages[0].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_second(self): + # Try to repair a content that is not in the first backup storage. + content = b'repair_content_present_second' + id = self.checker.objstorage.add(content) + # Add a content to the mock + self.checker.backup_storages[1].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_distributed(self): + # Try to repair two contents that are in separate backup storages. + content1 = b'repair_content_present_distributed_2' + content2 = b'repair_content_present_distributed_1' + id1 = self.checker.objstorage.add(content1) + id2 = self.checker.objstorage.add(content2) + # Add content to the mock. + self.checker.backup_storages[0].content_add(id1, content1) + self.checker.backup_storages[0].content_add(id2, content2) + # Corrupt and repair it + self.corrupt_content(id1) + self.corrupt_content(id2) + self.assertFalse(self.checker.check_content(id1)) + self.assertFalse(self.checker.check_content(id2)) + self.checker.repair_contents([id1, id2]) + self.assertTrue(self.checker.check_content(id1)) + self.assertTrue(self.checker.check_content(id2)) + + @istest + def repair_content_missing(self): + # Try to repair a content that is NOT in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add(content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertFalse(self.checker.check_content(id)) diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py index b0227505..f5ce9778 100644 --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_converters.py @@ -1,207 +1,130 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.storage import converters @attr('!db') class TestConverters(unittest.TestCase): def setUp(self): self.maxDiff = None @istest def db_to_author(self): # when actual_author = converters.db_to_author( 1, b'fullname', b'name', b'email') # then self.assertEquals(actual_author, { 'id': 1, 'fullname': b'fullname', 'name': b'name', 'email': b'email', }) @istest def db_to_revision(self): # when actual_revision = converters.db_to_revision({ 'id': 'revision-id', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'committer_date': None, 'committer_date_offset': None, 'committer_date_neg_utc_offset': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', 'committer_id': 'comm-id', 'committer_fullname': b'comm-fullname', 'committer_name': b'comm-name', 'committer_email': b'comm-email', 'metadata': {}, 'synthetic': False, 'parents': [123, 456] }) # then self.assertEquals(actual_revision, { 'id': 'revision-id', 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'committer': { 'id': 'comm-id', 'fullname': b'comm-fullname', 'name': b'comm-name', 'email': b'comm-email', }, 'committer_date': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'metadata': {}, 'synthetic': False, 'parents': [123, 456], }) @istest def db_to_release(self): # when actual_release = converters.db_to_release({ 'id': b'release-id', 'target': b'revision-id', 'target_type': 'revision', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'name': b'release-name', 'comment': b'release comment', 'synthetic': True, 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', }) # then self.assertEquals(actual_release, { 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'id': b'release-id', 'name': b'release-name', 'message': b'release comment', 'synthetic': True, 'target': b'revision-id', 'target_type': 'revision' }) - @istest - def backslashescape_errors(self): - raw_data_err = b'abcd\x80' - with self.assertRaises(UnicodeDecodeError): - raw_data_err.decode('utf-8', 'strict') - - self.assertEquals( - raw_data_err.decode('utf-8', 'backslashescape'), - 'abcd\\x80', - ) - - raw_data_ok = b'abcd\xc3\xa9' - self.assertEquals( - raw_data_ok.decode('utf-8', 'backslashescape'), - raw_data_ok.decode('utf-8', 'strict'), - ) - - unicode_data = 'abcdef\u00a3' - self.assertEquals( - unicode_data.encode('ascii', 'backslashescape'), - b'abcdef\\xa3', - ) - - @istest - def encode_with_unescape(self): - valid_data = '\\x01020304\\x00' - valid_data_encoded = b'\x01020304\x00' - - self.assertEquals( - valid_data_encoded, - converters.encode_with_unescape(valid_data) - ) - - @istest - def encode_with_unescape_invalid_escape(self): - invalid_data = 'test\\abcd' - - with self.assertRaises(ValueError) as exc: - converters.encode_with_unescape(invalid_data) - - self.assertIn('invalid escape', exc.exception.args[0]) - self.assertIn('position 4', exc.exception.args[0]) - - @istest - def decode_with_escape(self): - backslashes = b'foo\\bar\\\\baz' - backslashes_escaped = 'foo\\\\bar\\\\\\\\baz' - - self.assertEquals( - backslashes_escaped, - converters.decode_with_escape(backslashes), - ) - - valid_utf8 = b'foo\xc3\xa2' - valid_utf8_escaped = 'foo\u00e2' - - self.assertEquals( - valid_utf8_escaped, - converters.decode_with_escape(valid_utf8), - ) - - invalid_utf8 = b'foo\xa2' - invalid_utf8_escaped = 'foo\\xa2' - - self.assertEquals( - invalid_utf8_escaped, - converters.decode_with_escape(invalid_utf8), - ) - - valid_utf8_nul = b'foo\xc3\xa2\x00' - valid_utf8_nul_escaped = 'foo\u00e2\\x00' - - self.assertEquals( - valid_utf8_nul_escaped, - converters.decode_with_escape(valid_utf8_nul), - ) - @istest def db_to_git_headers(self): raw_data = [ ['gpgsig', b'garbage\x89a\x43b\x14'], ['extra', [b'fo\\\\\\o', b'bar\\', b'inval\\\\\x99id']], ] db_data = converters.git_headers_to_db(raw_data) loop = converters.db_to_git_headers(db_data) self.assertEquals(raw_data, loop) diff --git a/swh/storage/tests/test_objstorage.py b/swh/storage/tests/test_objstorage.py deleted file mode 100644 index 347d189d..00000000 --- a/swh/storage/tests/test_objstorage.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import gzip -import os -import shutil -import stat -import tempfile -import unittest - -from io import BytesIO -from nose.tools import istest - -from swh.core import hashutil -from swh.storage import objstorage -from swh.storage import exc - - -class TestObjStorage(unittest.TestCase): - - def setUp(self): - self.content = b'42\n' - - # sha1 - self.hex_obj_id = '34973274ccef6ab4dfaaf86599792fa9c3fe4689' - - # sha1_git - # self.hex_obj_id = 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd' - - self.obj_id = hashutil.hex_to_hash(self.hex_obj_id) - self.obj_steps = [self.hex_obj_id[0:2], self.hex_obj_id[2:4], - self.hex_obj_id[4:6]] - self.obj_relpath = os.path.join(*(self.obj_steps + [self.hex_obj_id])) - - self.tmpdir = tempfile.mkdtemp() - self.obj_dirs = [ - os.path.join(self.tmpdir, *self.obj_steps[:i]) - for i in range(1, len(self.obj_steps)) - ] - self.obj_path = os.path.join(self.tmpdir, self.obj_relpath) - - self.storage = objstorage.ObjStorage(root=self.tmpdir, depth=3) - - self.missing_obj_id = hashutil.hex_to_hash( - 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15') - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - def assertGzipContains(self, gzip_path, content): # noqa - self.assertEqual(gzip.open(gzip_path, 'rb').read(), content) - - @istest - def add_bytes_w_id(self): - r = self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def add_bytes_wo_id(self): - r = self.storage.add_bytes(self.content) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def add_file_w_id(self): - r = self.storage.add_file(BytesIO(self.content), - len(self.content), - obj_id=self.obj_id) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def add_file_wo_id(self): - r = self.storage.add_file(BytesIO(self.content), len(self.content)) - self.assertEqual(r, self.obj_id) - self.assertGzipContains(self.obj_path, self.content) - - @istest - def contains(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertIn(self.obj_id, self.storage) - self.assertNotIn(self.missing_obj_id, self.storage) - - @istest - def check_ok(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - try: - self.storage.check(self.obj_id) - except: - self.fail('integrity check failed') - - @istest - def check_missing(self): - with self.assertRaises(exc.Error): - self.storage.check(self.obj_id) - - @istest - def check_file_and_dirs_mode(self): - old_umask = os.umask(0) - self.storage.add_bytes(self.content, obj_id=self.obj_id) - for dir in self.obj_dirs: - stat_dir = os.stat(dir) - self.assertEquals(stat.S_IMODE(stat_dir.st_mode), - objstorage.DIR_MODE) - stat_res = os.stat(self.obj_path) - self.assertEquals(stat.S_IMODE(stat_res.st_mode), objstorage.FILE_MODE) - os.umask(old_umask) - - @istest - def check_not_gzip(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - with open(self.obj_path, 'ab') as f: # add trailing garbage - f.write(b'garbage') - with self.assertRaises(exc.Error): - self.storage.check(self.obj_id) - - @istest - def check_id_mismatch(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - with gzip.open(self.obj_path, 'wb') as f: # replace gzipped content - f.write(b'unexpected content') - with self.assertRaises(exc.Error): - self.storage.check(self.obj_id) - - @istest - def get_bytes(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(self.storage.get_bytes(self.obj_id), - self.content) - - @istest - def get_file_path(self): - self.storage.add_bytes(self.content, obj_id=self.obj_id) - path = self.storage._get_file_path(self.obj_id) - self.assertEqual(os.path.basename(path), self.hex_obj_id) - self.assertEqual(gzip.open(path, 'rb').read(), self.content) - - @istest - def get_missing(self): - with self.assertRaises(exc.Error): - with self.storage.get_file_obj(self.missing_obj_id) as f: - f.read() - - @istest - def iter(self): - self.assertEqual(list(iter(self.storage)), []) - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(list(iter(self.storage)), [self.obj_id]) - - @istest - def len(self): - self.assertEqual(len(self.storage), 0) - self.storage.add_bytes(self.content, obj_id=self.obj_id) - self.assertEqual(len(self.storage), 1) diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py index 284e1834..6676cd7c 100644 --- a/swh/storage/tests/test_objstorage_api.py +++ b/swh/storage/tests/test_objstorage_api.py @@ -1,83 +1,97 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.storage.exc import ObjNotFoundError, Error from swh.storage.tests.server_testing import ServerTestFixture -from swh.storage.objstorage.objstorage import _obj_path from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app @attr('db') class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.config = {'storage_base': tempfile.mkdtemp(), 'storage_depth': 3} self.app = app super().setUp() self.objstorage = RemoteObjStorage(self.url()) def tearDown(self): super().tearDown() @istest def content_add(self): content = bytes('Test content', 'utf8') id = self.objstorage.content_add(content) self.assertEquals(self.objstorage.content_get(id), content) @istest def content_get_present(self): content = bytes('content_get_present', 'utf8') content_hash = hashutil.hashdata(content) id = self.objstorage.content_add(content) self.assertEquals(content_hash['sha1'], id) @istest def content_get_missing(self): content = bytes('content_get_missing', 'utf8') content_hash = hashutil.hashdata(content) with self.assertRaises(ObjNotFoundError): self.objstorage.content_get(content_hash['sha1']) + @istest + def content_get_random(self): + ids = [] + for i in range(100): + content = bytes('content_get_present', 'utf8') + id = self.objstorage.content_add(content) + ids.append(id) + for id in self.objstorage.content_get_random(50): + self.assertIn(id, ids) + @istest def content_check_invalid(self): content = bytes('content_check_invalid', 'utf8') id = self.objstorage.content_add(content) - path = _obj_path(hashutil.hash_to_hex(id), - self.app.config['storage_base'], - self.app.config['storage_depth']) + hex_obj_id = hashutil.hash_to_hex(id) + dir_path = os.path.join( + self.config['storage_base'], + *[hex_obj_id[i*2:i*2+2] + for i in range(int(self.config['storage_depth']))] + ) + path = os.path.join(dir_path, hex_obj_id) content = list(content) with open(path, 'bw') as f: content[0] = (content[0] + 1) % 128 f.write(bytes(content)) with self.assertRaises(Error): self.objstorage.content_check(id) @istest def content_check_valid(self): content = bytes('content_check_valid', 'utf8') id = self.objstorage.content_add(content) try: self.objstorage.content_check(id) except: self.fail('Integrity check failed') @istest def content_check_missing(self): content = bytes('content_check_valid', 'utf8') content_hash = hashutil.hashdata(content) with self.assertRaises(ObjNotFoundError): self.objstorage.content_check(content_hash['sha1']) diff --git a/swh/storage/tests/test_objstorage_pathslicing.py b/swh/storage/tests/test_objstorage_pathslicing.py new file mode 100644 index 00000000..cc17a52e --- /dev/null +++ b/swh/storage/tests/test_objstorage_pathslicing.py @@ -0,0 +1,78 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest + +from swh.core import hashutil +from swh.storage import exc +from swh.storage.objstorage import PathSlicingObjStorage + +from objstorage_testing import ObjStorageTestFixture + + +class TestpathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.depth = 3 + self.slicing = 2 + self.tmpdir = tempfile.mkdtemp() + self.storage = PathSlicingObjStorage(self.tmpdir, self.depth, + self.slicing) + + def content_path(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return self.storage._obj_path(hex_obj_id) + + @istest + def contains(self): + content_p, obj_id_p = self.hash_content(b'contains_present') + content_m, obj_id_m = self.hash_content(b'contains_missing') + self.storage.add(content_p, obj_id=obj_id_p) + self.assertIn(obj_id_p, self.storage) + self.assertNotIn(obj_id_m, self.storage) + + @istest + def iter(self): + content, obj_id = self.hash_content(b'iter') + self.assertEqual(list(iter(self.storage)), []) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(list(iter(self.storage)), [obj_id]) + + @istest + def len(self): + content, obj_id = self.hash_content(b'check_not_gzip') + self.assertEqual(len(self.storage), 0) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(len(self.storage), 1) + + @istest + def check_not_gzip(self): + content, obj_id = self.hash_content(b'check_not_gzip') + self.storage.add(content, obj_id=obj_id) + with open(self.content_path(obj_id), 'ab') as f: # Add garbage. + f.write(b'garbage') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def check_id_mismatch(self): + content, obj_id = self.hash_content(b'check_id_mismatch') + self.storage.add(content, obj_id=obj_id) + with open(self.content_path(obj_id), 'wb') as f: + f.write(b'unexpected content') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def get_random_contents(self): + content, obj_id = self.hash_content(b'get_random_content') + self.storage.add(content, obj_id=obj_id) + random_contents = list(self.storage.get_random(1)) + self.assertEqual(1, len(random_contents)) + self.assertIn(obj_id, random_contents) diff --git a/version.txt b/version.txt index 04de3f6a..c4346227 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.39-0-g60a0be9 \ No newline at end of file +v0.0.40-0-g8c6e8d7 \ No newline at end of file