diff --git a/PKG-INFO b/PKG-INFO index 5ad7def4b..fa32041a8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.60 +Version: 0.0.61 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index cd7522351..9b8aa7d3c 100644 --- a/debian/control +++ b/debian/control @@ -1,42 +1,42 @@ Source: swh-storage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-dateutil, python3-flask, python3-nose, python3-psycopg2, python3-requests, python3-setuptools, - python3-swh.core (>= 0.0.23~), + python3-swh.core (>= 0.0.24~), python3-swh.objstorage (>= 0.0.11~), python3-vcversioner, python3-swh.scheduler, python3-click Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ Package: python3-swh.storage Architecture: all -Depends: python3-swh.core (>= 0.0.23~), python3-swh.objstorage (>= 0.0.11~), ${misc:Depends}, ${python3:Depends} +Depends: python3-swh.core (>= 0.0.24~), python3-swh.objstorage (>= 0.0.11~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage utilities Package: python3-swh.storage.listener Architecture: all Depends: python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends}, python3-kafka (>= 1.3.1~) Description: Software Heritage storage listener Package: python3-swh.storage.archiver Architecture: all Depends: python3-swh.storage (= ${binary:Version}), python3-swh.scheduler, ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Archiver Package: python3-swh.storage.provenance Architecture: all Depends: python3-swh.storage (= ${binary:Version}), python3-swh.scheduler, ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Provenance diff --git a/requirements.txt b/requirements.txt index 10b589bd0..d7071835d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,16 +1,16 @@ dateutil psycopg2 vcversioner # remote storage API client requests # remote storage API server flask # Internal dependencies -swh.core >= 0.0.23 +swh.core >= 0.0.24 swh.objstorage >= 0.0.11 click swh.scheduler diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 5ad7def4b..fa32041a8 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.60 +Version: 0.0.61 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index 8af8be9f2..4c8c21b4f 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,9 +1,9 @@ click dateutil flask psycopg2 requests -swh.core>=0.0.23 +swh.core>=0.0.24 swh.objstorage>=0.0.11 swh.scheduler vcversioner diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py index b4e67c22f..53e517636 100644 --- a/swh/storage/archiver/copier.py +++ b/swh/storage/archiver/copier.py @@ -1,49 +1,56 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +from swh.core import hashutil +from swh.objstorage.exc import ObjNotFoundError logger = logging.getLogger('archiver.worker.copier') class ArchiverCopier(): """ This archiver copy some files into a remote objstorage in order to get a backup. """ def __init__(self, source, destination, content_ids): """ Create a Copier for the archiver Args: source (ObjStorage): source storage to get the contents. destination (ObjStorage): Storage where the contents will be copied. content_ids: list of content's id to archive. """ self.source = source self.destination = destination self.content_ids = content_ids def run(self): """ Do the copy on the backup storage. Run the archiver copier in order to copy the required content into the current destination. The content which corresponds to the sha1 in self.content_ids will be fetched from the master_storage and then copied into the backup object storage. Returns: A boolean that indicates if the whole content have been copied. """ try: for content_id in self.content_ids: - content = self.source.get(content_id) + try: + content = self.source.get(content_id) + except ObjNotFoundError: + logging.error('content %s not found' % + hashutil.hash_to_hex(content_id)) + continue self.destination.add(content, content_id) except Exception as e: logging.error('Problem during copy: %s' % e) return False return True diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py index 2ad8c1cc7..a4611d96b 100644 --- a/swh/storage/archiver/db.py +++ b/swh/storage/archiver/db.py @@ -1,230 +1,249 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time +from swh.core import hashutil from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") yield from cursor_to_bytes(cur) def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id = %s ORDER BY content_id """ cur = self._cursor(cur) cur.execute(query, (content_id,)) row = cur.fetchone() if not row: return None content_id, present, ongoing, mtimes = row return (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s AND num_present < %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, retention_policy, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. Use from archiver.storage module: self.db.mktemp_content_archive() # copy data over to the temp table self.db.copy_to([{'colname': id0}, {'colname': id1}], 'tmp_cache_content', ['colname'], cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) yield from cursor_to_bytes(cur) + def content_archive_insert(self, content_id, source, status, cur=None): + """Insert a new entry in the db for the content_id. + + Args: + content_id: content concerned + source: name of the source + status: the status of the content for that source + + """ + if isinstance(content_id, bytes): + content_id = '\\x%s' % hashutil.hash_to_hex(content_id) + + query = """INSERT INTO content_archive(content_id, copies, num_present) + VALUES('%s', '{"%s": {"status": "%s", "mtime": %d}}', 1) + """ % (content_id, source, status, int(time.time())) + cur = self._cursor(cur) + cur.execute(query) + def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ if new_status is not None: query = """UPDATE content_archive SET copies=jsonb_set( copies, '{%s}', '{"status":"%s", "mtime":%d}' ) WHERE content_id='%s' """ % (archive_id, new_status, int(time.time()), content_id) else: query = """ UPDATE content_archive SET copies=jsonb_set(copies, '{%s,mtime}', '%d') WHERE content_id='%s' """ % (archive_id, int(time.time())) cur = self._cursor(cur) cur.execute(query) diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py index 24d4128cb..b6dfebe37 100644 --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,229 +1,229 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import sys from swh.core import config, utils, hashutil from swh.scheduler.celery_backend.config import app from . import tasks # noqa from .storage import ArchiverStorage class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract Director class An archiver director is in charge of dispatching batch of contents to archiver workers (for them to archive). Inherit from this class and provide: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def get_contents_to_archive(self): Implementation method to read contents to archive """ DEFAULT_CONFIG = { 'batch_max_size': ('int', 1500), 'asynchronous': ('bool', True), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') } # Destined to be overridden by subclass ADDITIONAL_CONFIG = {} # We use the same configuration file as the worker CONFIG_BASE_FILENAME = 'archiver/worker' # The worker's task queue name to use TASK_NAME = None def __init__(self): """ Constructor of the archiver director. Args: db_conn_archiver: Either a libpq connection string, or a psycopg2 connection for the archiver db. config: optionnal additional configuration. Keys in the dict will override the one parsed from the configuration file. """ super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.archiver_storage = ArchiverStorage(self.config['dbconn']) def run(self): """ Run the archiver director. The archiver director will check all the contents of the archiver database and do the required backup jobs. """ if self.config['asynchronous']: run_fn = self.run_async_worker else: run_fn = self.run_sync_worker for batch in self.read_batch_contents(): run_fn(batch) def run_async_worker(self, batch): """ Produce a worker that will be added to the task queue. """ task = app.tasks[self.TASK_NAME] task.delay(batch=batch) def run_sync_worker(self, batch): """ Run synchronously a worker on the given batch. """ task = app.tasks[self.TASK_NAME] task(batch=batch) def read_batch_contents(self): """ Create batch of contents that needs to be archived Yields: batch of sha1 that corresponds to contents that needs more archive copies. """ contents = [] for content in self.get_contents_to_archive(): contents.append(content) if len(contents) > self.config['batch_max_size']: yield contents contents = [] if len(contents) > 0: yield contents @abc.abstractmethod def get_contents_to_archive(self): """Retrieve generator of sha1 to archive Yields: sha1 to archive """ pass class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): """Process the files in order to know which one is needed as backup. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), } TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask' def get_contents_to_archive(self): """Create batch of contents that needs to be archived Yields: Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies is a dict mapping copy to mtime. """ last_content = None while True: archiver_contents = list( self.archiver_storage.content_archive_get_unarchived_copies( last_content=last_content, retention_policy=self.config['retention_policy'])) if not archiver_contents: return for content_id, _, _ in archiver_contents: last_content = content_id yield content_id def read_sha1_from_stdin(): """Read sha1 from stdin. """ for sha1 in sys.stdin: yield {'content_id': hashutil.hex_to_hash(sha1.rstrip())} class ArchiverStdinToBackendDirector(ArchiverDirectorBase): """A cloud archiver director in charge of reading contents and send them in batch in the cloud. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. """ ADDITIONAL_CONFIG = { - 'destination_host': ('str', 'azure'), + 'destination': ('str', 'azure'), 'force_copy': ('bool', False), } CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask' def __init__(self): super().__init__() - self.destination_host = self.config['destination_host'] + self.destination = self.config['destination'] self.force_copy = self.config['force_copy'] def get_contents_to_archive(self): gen_content_ids = ( ids for ids in utils.grouper(read_sha1_from_stdin(), self.config['batch_max_size']) ) if self.force_copy: for content_ids in gen_content_ids: content_ids = list(content_ids) if not content_ids: continue print('Send %s contents to archive' % len(content_ids)) for content in content_ids: yield content['content_id'] else: for content_ids in gen_content_ids: content_ids = list( self.archiver_storage.content_archive_get_missing( content_ids=content_ids, - backend_name=self.destination_host)) + backend_name=self.destination)) if not content_ids: continue print('Send %s contents to archive' % len(content_ids)) for content in content_ids: yield content @click.command() @click.option('--direct', is_flag=True, help="""The archiver sends content for backup to one storage.""") def launch(direct): if direct: archiver = ArchiverStdinToBackendDirector() else: archiver = ArchiverWithRetentionPolicyDirector() archiver.run() if __name__ == '__main__': launch() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py index 02d2351a2..1336c17af 100644 --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -1,136 +1,148 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 from .db import ArchiverDb from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError class ArchiverStorage(): """SWH Archiver storage proxy, encompassing DB """ def __init__(self, db_conn): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = ArchiverDb(db_conn) else: self.db = ArchiverDb.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) @db_transaction_generator def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from self.db.archive_ls(cur) @db_transaction def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return self.db.content_archive_get(content_id, cur) @db_transaction_generator def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_copies(last_content, limit, cur) @db_transaction_generator def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_unarchived_copies( retention_policy, last_content, limit, cur) @db_transaction_generator def content_archive_get_missing(self, content_ids, backend_name, cur=None): """Retrieve the list of missing copies from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: List of ids effectively missing from backend_name """ db = self.db db.mktemp_content_archive() db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_missing(backend_name, cur): yield content_id[0] @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ self.db.content_archive_update(content_id, archive_id, new_status, cur) + + @db_transaction + def content_archive_insert(self, content_id, source, status, cur=None): + """Insert a new entry in db about content_id. + + Args: + content_id: content concerned + source: name of the source + status: the status of the content for that source + + """ + self.db.content_archive_insert(content_id, source, status, cur) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py index 86f343a0b..4f797e530 100644 --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -1,373 +1,413 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging import random import time from collections import defaultdict from swh.objstorage import get_objstorage from swh.core import hashutil, config from swh.objstorage.exc import Error, ObjNotFoundError from .storage import ArchiverStorage from .copier import ArchiverCopier logger = logging.getLogger('archiver.worker') class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta): """Base archive worker. Inherit from this class and override: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def need_archival(self, content_data): Determine if a content needs archival or not - def choose_backup_servers(self, present, missing): Choose which backup server to send copies to """ DEFAULT_CONFIG = { 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev'), + 'source': ('str', 'uffizi'), 'storages': ('list[dict]', [ {'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}} ]) } ADDITIONAL_CONFIG = {} CONFIG_BASE_FILENAME = 'archiver/worker' objstorages = {} def __init__(self, batch): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.batch = batch self.archiver_db = ArchiverStorage(self.config['dbconn']) self.objstorages = { storage['host']: get_objstorage(storage['cls'], storage['args']) for storage in self.config.get('storages', []) } + self.set_objstorages = set(self.objstorages) + # Fallback objstorage + self.source = self.config['source'] def run(self): """Do the task expected from the archiver worker. Process the contents in self.batch, ensure that the elements still need an archival (using archiver db), and spawn copiers to copy files in each destination according to the archiver-worker's policy. """ transfers = defaultdict(list) - set_objstorages = set(self.objstorages) for obj_id in self.batch: # Get dict {'missing': [servers], 'present': [servers]} # for contents ignoring those who don't need archival. - copies = self.compute_copies(set_objstorages, obj_id) + copies = self.compute_copies(self.set_objstorages, obj_id) if not copies: - logger.warning('Unknown content archiver-wise %s' % - hashutil.hash_to_hex(obj_id)) - continue + # could happen if archiver db lags behind + copies = self.compute_fallback_copies( + self.source, self.set_objstorages, obj_id) + if not copies: + msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id) + logger.warning(msg) + continue + if not self.need_archival(copies): continue + present = copies.get('present', []) missing = copies.get('missing', []) if len(present) == 0: - logger.critical('Lost content %s' % - hashutil.hash_to_hex(obj_id)) + msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id) + logger.critical(msg) continue + # Choose servers to be used as srcs and dests. for src_dest in self.choose_backup_servers(present, missing): transfers[src_dest].append(obj_id) # Then run copiers for each of the required transfers. for (src, dest), content_ids in transfers.items(): self.run_copier(src, dest, content_ids) + def compute_fallback_copies(self, source, set_objstorages, content_id): + """Compute fallback copies for content_id. + + Args: + source: the objstorage where the content_id is supposedly present + set_objstorages: the complete set of objstorages + content_id: the content concerned + + Returns: + A dictionary with keys 'present' and 'missing' that are + mapped to lists of copies ids depending on whenever the + content is present or missing on the copy. + + There is also the key 'ongoing' which is associated with a + dict that map to a copy name the mtime of the ongoing + status update. + + """ + if content_id not in self.objstorages[source]: + return None + + # insert a new entry about of the content_id's presence for that source + self.archiver_db.content_archive_insert( + content_id=content_id, source=self.source, status='present') + + # Now compute the fallback copies + set_present = {self.source} + set_missing = set_objstorages - set_present + return { + 'present': set_present, + 'missing': set_missing, + 'ongoing': {} + } + def compute_copies(self, set_objstorages, content_id): """From a content_id, return present and missing copies. Args: objstorages (set): objstorage's id name content_id: the content concerned Returns: - A dictionary with keys 'present' and 'missing'. - that are mapped to lists of copies ids depending on - whenever the content is present or missing on the copy. + A dictionary with keys 'present' and 'missing' that are + mapped to lists of copies ids depending on whenever the + content is present or missing on the copy. There is also the key 'ongoing' which is associated with a dict that map to a copy name the mtime of the ongoing status update. """ result = self.archiver_db.content_archive_get(content_id) if not result: return None _, present, ongoing = result set_present = set(present) set_ongoing = set(ongoing) set_missing = set_objstorages - set_present - set_ongoing return { 'present': set_present, 'missing': set_missing, 'ongoing': ongoing } def run_copier(self, source, destination, content_ids): """Run a copier in order to archive the given contents. Upload the given contents from the source to the destination. If the process fails, the whole content is considered uncopied and remains 'ongoing', waiting to be rescheduled as there is a delay. Args: source (str): source storage's identifier destination (str): destination storage's identifier content_ids ([sha1]): list of content ids to archive. """ # Check if there are any errors among the contents. content_status = self.get_contents_error(content_ids, source) # Iterates over the error detected. for content_id, real_status in content_status.items(): # Remove them from the to-archive list, # as they cannot be retrieved correctly. content_ids.remove(content_id) # Update their status to reflect their real state. self.content_archive_update( content_id, archive_id=source, new_status=real_status) # Now perform the copy on the remaining contents ac = ArchiverCopier( source=self.objstorages[source], destination=self.objstorages[destination], content_ids=content_ids) if ac.run(): # Once the archival complete, update the database. for content_id in content_ids: self.content_archive_update( content_id, archive_id=destination, new_status='present') def get_contents_error(self, content_ids, source_storage): """Indicates what is the error associated to a content when needed Check the given content on the given storage. If an error is detected, it will be reported through the returned dict. Args: content_ids ([sha1]): list of content ids to check source_storage (str): the source storage holding the contents to check. Returns: a dict that map {content_id -> error_status} for each content_id with an error. The `error_status` result may be 'missing' or 'corrupted'. """ content_status = {} storage = self.objstorages[source_storage] for content_id in content_ids: try: storage.check(content_id) except Error: content_status[content_id] = 'corrupted' logger.error('%s corrupted!' % hashutil.hash_to_hex( content_id)) except ObjNotFoundError: content_status[content_id] = 'missing' logger.error('%s missing!' % hashutil.hash_to_hex(content_id)) return content_status def content_archive_update(self, content_id, archive_id, new_status=None): """Update the status of a archive content and set its mtime to now. Change the last modification time of an archived content and change its status to the given one. Args: content_id (str): The content id. archive_id (str): The id of the concerned archive. new_status (str): One of missing, ongoing or present, this status will replace the previous one. If not given, the function only changes the mtime of the content. """ db_obj_id = r'\x' + hashutil.hash_to_hex(content_id) self.archiver_db.content_archive_update( db_obj_id, archive_id, new_status ) @abc.abstractmethod def need_archival(self, content_data): """Indicate if the content needs to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ pass @abc.abstractmethod def choose_backup_servers(self, present, missing): """Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. Yields: tuple (source (str), destination (src)) for each required copy. """ pass class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker): """ Do the required backups on a given batch of contents. Process the content of a content batch in order to do the needed backups on the slaves servers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), 'archival_max_age': ('int', 3600), } def __init__(self, batch): """ Constructor of the ArchiverWorker class. Args: batch: list of object's sha1 that potentially need archival. """ super().__init__(batch) config = self.config self.retention_policy = config['retention_policy'] self.archival_max_age = config['archival_max_age'] if len(self.objstorages) < self.retention_policy: raise ValueError('Retention policy is too high for the number of ' 'provided servers') def need_archival(self, content_data): """ Indicate if the content need to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ nb_presents = len(content_data.get('present', [])) for copy, mtime in content_data.get('ongoing', {}).items(): if not self._is_archival_delay_elasped(mtime): nb_presents += 1 return nb_presents < self.retention_policy def _is_archival_delay_elapsed(self, start_time): """ Indicates if the archival delay is elapsed given the start_time Args: start_time (float): time at which the archival started. Returns: True if the archival delay is elasped, False otherwise """ elapsed = time.time() - start_time return elapsed > self.archival_max_age def choose_backup_servers(self, present, missing): """Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. Each destination server is unique so after archival, the retention policy requirement will be fulfilled. However, the source server may be used multiple times. Yields: tuple (source, destination) for each required copy. """ # Transform from set to list to allow random selections missing = list(missing) present = list(present) nb_required = self.retention_policy - len(present) destinations = random.sample(missing, nb_required) sources = [random.choice(present) for dest in destinations] yield from zip(sources, destinations) class ArchiverToBackendWorker(BaseArchiveWorker): """Worker that send copies over from a source to another backend. Process the content of a content batch from source objstorage to destination objstorage. """ CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' def __init__(self, batch): """Constructor of the ArchiverWorkerToBackend class. Args: batch: list of object's sha1 that potentially need archival. """ super().__init__(batch) - self.destination_host = self.config['destination_host'] + self.destination = self.config['destination'] def need_archival(self, content_data): """Indicate if the content needs to be archived. Args: content_data (dict): dict that contains 3 lists 'present', 'ongoing' and 'missing' with copies id corresponding to this status. Returns: True if we need to archive, False otherwise """ - if self.destination_host in content_data.get('missing', {}): + if self.destination in content_data.get('missing', {}): return True return False def choose_backup_servers(self, present, missing): - missing = list(missing) - present = list(present) - destinations = random.sample(missing, len(missing)) - sources = [random.choice(present) for dest in destinations] - yield from zip(sources, destinations) + yield (random.choice(present), self.destination) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py index 812ae51f9..91005c274 100644 --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,295 +1,323 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest import os import time import json from nose.tools import istest from nose.plugins.attrib import attr from swh.core import hashutil from swh.core.tests.db_testing import DbsTestFixture from server_testing import ServerTestFixture from swh.storage.archiver import ArchiverWithRetentionPolicyDirector from swh.storage.archiver import ArchiverWithRetentionPolicyWorker from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.api.server import app TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class TestArchiver(DbsTestFixture, ServerTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAMES = [ 'softwareheritage-archiver-test', ] TEST_DB_DUMPS = [ os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), ] TEST_DB_DUMP_TYPES = [ 'pg_dump', ] def setUp(self): # Launch the backup server dest_root = tempfile.mkdtemp(prefix='remote') self.config = { 'cls': 'pathslicing', 'args': { 'root': dest_root, 'slicing': '0:2/2:4/4:6', } } self.app = app super().setUp() # Retrieve connection (depends on the order in TEST_DB_NAMES) self.conn = self.conns[0] # archiver db's connection self.cursor = self.cursors[0] # Create source storage src_root = tempfile.mkdtemp() src_config = { 'cls': 'pathslicing', 'args': { 'root': src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'remote', 'args': { 'base_url': self.url() } } self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = { 'uffizi': self.src_storage, 'banco': self.dest_storage } # Override configurations src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self._override_director_config() self._override_worker_config() # Create the base archiver self.archiver = self._create_director() def tearDown(self): self.empty_tables() super().tearDown() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content_archive') self.conn.commit() def _override_director_config(self, retention_policy=2): """ Override the default config of the Archiver director to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa 'dbconn': self.conn, 'batch_max_size': 5000, 'archival_max_age': 3600, 'retention_policy': retention_policy, - 'asynchronous': False + 'asynchronous': False, } def _override_worker_config(self): """ Override the default config of the Archiver worker to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa 'retention_policy': 2, 'archival_max_age': 3600, 'dbconn': self.conn, - 'storages': self.archiver_storages + 'storages': self.archiver_storages, + 'source': 'uffizi', } def _create_director(self): return ArchiverWithRetentionPolicyDirector() def _create_worker(self, batch={}): return ArchiverWithRetentionPolicyWorker(batch) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. + + Args: + storage_name: the concerned storage + content_data: the data to insert + with_row_insert: to insert a row entry in the db or not + """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) self.cursor.execute(""" INSERT INTO content_archive VALUES('%s', '{}') """ % (db_obj_id)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) self.archiver.archiver_storage.content_archive_update( db_obj_id, storage_name, status ) def _add_dated_content(self, obj_id, copies={}): """ Fully erase the previous copies field for the given content id This does not alter the contents into the objstorages. """ db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) self.cursor.execute(""" UPDATE TABLE content_archive SET copies='%s' WHERE content_id='%s' """ % (json.dumps(copies), db_obj_id)) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self._override_director_config(retention_policy=1) director = self._create_director() # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) # Unit tests for archive worker def archival_elapsed(self, mtime): return self._create_worker()._is_archival_delay_elapsed(mtime) @istest def vstatus_ongoing_remaining(self): self.assertFalse(self.archival_elapsed(time.time())) @istest def vstatus_ongoing_elapsed(self): past_time = ( time.time() - self._create_worker().archival_max_age ) self.assertTrue(self.archival_elapsed(past_time)) def _status(self, status, mtime=None): """ Get a dict that match the copies structure """ return {'status': status, 'mtime': mtime or time.time()} @istest def need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), True) @istest def need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker.compute_copies( set(worker.objstorages), obj_id)[status]) @istest def compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') @istest def compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker.choose_backup_servers(present, missing)) @istest def choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) + + # This cannot be tested with ArchiverWithRetentionPolicyDirector + # (it reads from archiver db) + # @istest + # def archive_missing_content__without_row_entry_in_archive_db(self): + # """ Run archiver on a missing content should archive it. + # """ + # obj_data = b'archive_missing_content_without_row_entry_in_archive_db' + # obj_id = self._add_content('uffizi', obj_data) + # # One entry in archiver db but no status about its whereabouts + # # Content is actually missing on banco but present on uffizi + # try: + # self.dest_storage.get(obj_id) + # except ObjNotFoundError: + # pass + # else: + # self.fail('Content should not be present before archival') + # self.archiver.run() + # # now the content should be present on remote objstorage + # remote_data = self.dest_storage.get(obj_id) + # self.assertEquals(obj_data, remote_data) diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py index feee6f10d..dbb576a63 100644 --- a/swh/storage/vault/api/cooking_tasks.py +++ b/swh/storage/vault/api/cooking_tasks.py @@ -1,24 +1,31 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.core import hashutil from ..cache import VaultCache from ..cooker import DirectoryVaultCooker from ... import get_storage +COOKER_TYPES = { + 'directory': DirectoryVaultCooker +} + + class SWHCookingTask(Task): - """ Main task that archive a batch of content. + """Main task which archives a contents batch. + """ task_queue = 'swh_storage_vault_cooking' - def run(self, hex_dir_id, storage_args, cache_args): + def run(self, type, hex_id, storage_args, cache_args): + # Initialize elements storage = get_storage(**storage_args) cache = VaultCache(**cache_args) - directory_cooker = DirectoryVaultCooker(storage, cache) - - dir_id = hashutil.hex_to_hash(hex_dir_id) - directory_cooker.cook(dir_id) + # Initialize cooker + cooker = COOKER_TYPES[type](storage, cache) + # Perform the cooking + cooker.cook(obj_id=hashutil.hex_to_hash(hex_id)) diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py index 075c751b4..ba9e6a690 100644 --- a/swh/storage/vault/api/server.py +++ b/swh/storage/vault/api/server.py @@ -1,92 +1,101 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from flask import Flask, abort, g +from werkzeug.routing import BaseConverter from swh.core import config from swh.objstorage.api.common import encode_data_server as encode_data from swh.objstorage.api.common import BytesRequest, error_handler from swh.storage import get_storage from swh.storage.vault.api import cooking_tasks # NOQA from swh.storage.vault.cache import VaultCache from swh.storage.vault.cooker import DirectoryVaultCooker from swh.scheduler.celery_backend.config import app as celery_app -from flask_profile import Profiler cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' DEFAULT_CONFIG = { 'storage': ('dict', {'storage_class': 'local_storage', 'storage_args': [ 'dbname=softwareheritage-dev', '/tmp/objects' ] }), 'cache': ('dict', {'root': '/tmp/vaultcache'}) } +class RegexConverter(BaseConverter): + def __init__(self, url_map, *items): + super().__init__(url_map) + self.regex = items[0] + + app = Flask(__name__) -Profiler(app) app.request_class = BytesRequest +app.url_map.converters['regex'] = RegexConverter @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.cache = VaultCache(**app.config['cache']) g.cooker = DirectoryVaultCooker( get_storage(**app.config['storage']), g.cache ) @app.route('/') def index(): return 'SWH vault API server' -@app.route('/vault/directory/', methods=['GET']) -def ls_directory(): +@app.route('/vault//', + methods=['GET']) +def ls_directory(type): return encode_data(list( - g.cache.ls('directory') + g.cache.ls(type) )) -@app.route('/vault/directory//', methods=['GET']) -def get_cooked_directory(dir_id): - if not g.cache.is_cached('directory', dir_id): +@app.route('/vault///', + methods=['GET']) +def get_cooked_directory(type, id): + if not g.cache.is_cached(type, id): abort(404) - return encode_data(g.cache.get('directory', dir_id).decode()) + return encode_data(g.cache.get(type, id).decode()) -@app.route('/vault/directory//', methods=['POST']) -def cook_request_directory(dir_id): +@app.route('/vault///', + methods=['POST']) +def cook_request_directory(type, id): task = celery_app.tasks[cooking_task_name] - task.delay(dir_id, app.config['storage'], app.config['cache']) + task.delay(type, id, app.config['storage'], app.config['cache']) # Return url to get the content and 201 CREATED - return encode_data('/vault/directory/dir_id/'), 201 + return encode_data('/vault/%s/%s/' % (type, id)), 201 @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py index 588682ae0..cbc662c8a 100644 --- a/swh/storage/vault/cache.py +++ b/swh/storage/vault/cache.py @@ -1,64 +1,69 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.core import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage_pathslicing import DIR_MODE BUNDLE_TYPES = { 'directory': 'd', + 'revision': 'r', + 'snapshot': 's', } class VaultCache(): """The vault cache is an object storage that stores bundles The current implementation uses a PathSlicingObjStorage to store the bundles. The id of a content if prefixed to specify its type and store different types of bundle in different folders. """ def __init__(self, root): for subdir in BUNDLE_TYPES.values(): fp = os.path.join(root, subdir) if not os.path.isdir(fp): os.makedirs(fp, DIR_MODE, exist_ok=True) self.storages = { type: get_objstorage( 'pathslicing', {'root': os.path.join(root, subdir), 'slicing': '0:1/0:5'} ) for type, subdir in BUNDLE_TYPES.items() } def __contains__(self, obj_id): - return obj_id in self.storage + for storage in self.storages: + if obj_id in storage: + return True + return False def add(self, obj_type, obj_id, content): storage = self._get_storage(obj_type) return storage.add(content, obj_id) def get(self, obj_type, obj_id): storage = self._get_storage(obj_type) return storage.get(hashutil.hex_to_hash(obj_id)) def is_cached(self, obj_type, obj_id): storage = self._get_storage(obj_type) return hashutil.hex_to_hash(obj_id) in storage def ls(self, obj_type): storage = self._get_storage(obj_type) yield from storage def _get_storage(self, obj_type): """Get the storage that corresponds to the object type""" try: return self.storages[obj_type] except: raise ValueError('Wrong bundle type: ' + obj_type) diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py index b48c4a834..6554a11ae 100644 --- a/swh/storage/vault/cooker.py +++ b/swh/storage/vault/cooker.py @@ -1,173 +1,224 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io +import itertools import os import tarfile import tempfile -import itertools + from swh.core import hashutil + SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') HIDDEN_MESSAGE = (b'This content is hidden') class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. + To define a new cooker, inherit from this class and override: + - CACHE_TYPE_KEY: key to use for the bundle to reference in cache + - def cook(obj_id): cook the object into a bundle + - def notify_bundle_ready(notif_data, bundle_id): notify the + bundle is ready. + """ + CACHE_TYPE_KEY = None + + def __init__(self, storage, cache): + self.storage = storage + self.cache = cache + @abc.abstractmethod def cook(self, obj_id): """Cook the requested object into a bundle The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_id: id of the object to be cooked into a bundle. """ - raise NotImplementedError( - 'Vault cookers must implement a `cook` method') + pass + + def update_cache(self, id, bundle_content): + """Update the cache with id and bundle_content. + + """ + self.cache.add(self.CACHE_TYPE_KEY, id, bundle_content) + + @abc.abstractmethod + def notify_bundle_ready(self, notif_data, bundle_id): + """Notify the bundle bundle_id is ready. + + """ + pass class DirectoryVaultCooker(BaseVaultCooker): """Cooker to create a directory bundle """ + CACHE_TYPE_KEY = 'directory' def __init__(self, storage, cache): """Initialize a cooker that create directory bundles Args: storage: source storage where content are retrieved. cache: destination storage where the cooked bundle are stored. """ self.storage = storage self.cache = cache def cook(self, dir_id): """Cook the requested directory into a Bundle Args: dir_id (bytes): the id of the directory to be cooked. Returns: bytes that correspond to the bundle """ - root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), - 'utf8') - # Retrieve data from the database - data = list(self.storage.directory_ls(dir_id, recursive=True)) + # Create the bytes that corresponds to the compressed + # directory. + directory_cooker = DirectoryCooker(self.storage) + bundle_content = directory_cooker.get_directory_bytes(dir_id) + # Cache the bundle + self.update_cache(dir_id, bundle_content) + # Make a notification that the bundle have been cooked + # NOT YET IMPLEMENTED see TODO in function. + self.notify_bundle_ready( + notif_data='Bundle %s ready' % hashutil.hash_to_hex(dir_id), + bundle_id=dir_id) + + def notify_bundle_ready(self, bundle_id): + # TODO plug this method with the notification method once + # done. + pass + + +class DirectoryCooker(): + """Creates a cooked directory from its sha1_git in the db. + + Warning: This is NOT a directly accessible cooker, but a low-level + one that executes the manipulations. + + """ + def __init__(self, storage): + self.storage = storage + + def get_directory_bytes(self, dir_id): + # Create temporary folder to retrieve the files into. + root = bytes(tempfile.mkdtemp(prefix='directory.', + suffix='.cook'), 'utf8') + # Retrieve data from the database. + data = self.storage.directory_ls(dir_id, recursive=True) + # Split into files and directory data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') - # Recreate the directory + # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) - # Use the created directory to get the bundle datas + # Use the created directory to make a bundle with the data as + # a compressed directory. bundle_content = self._create_bundle_content( root, - hashutil.hash_to_hex(dir_id) - ) - self._cache_bundle(dir_id, bundle_content) - - # Make a notification that the bundle have been cooked - self._notify_bundle_ready(dir_id) + hashutil.hash_to_hex(dir_id)) + return bundle_content def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, - key=lambda x: len(x.split(bsep)) - ) + key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): - """Iterates over the file datas and delegate to the right method. + """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) status = file_data['status'] if status == 'absent': self._create_file_absent(path) elif status == 'hidden': self._create_file_hidden(path) else: content = self._get_file_content(file_data['sha1']) self._create_file(path, content) - def _get_file_content(self, obj_id): - content = list(self.storage.content_get([obj_id]))[0]['data'] - return content - def _create_file(self, path, content): - """Create the given file and fill it with content.""" + """Create the given file and fill it with content. + + """ with open(path, 'wb') as f: f.write(content) + def _get_file_content(self, obj_id): + """Get the content of the given file. + + """ + content = list(self.storage.content_get([obj_id]))[0]['data'] + return content + def _create_file_absent(self, path): """Create a file that indicates a skipped content Create the given file but fill it with a specific content to - indicates that the content have not been retrieved by the + indicate that the content have not been retrieved by the software heritage archive due to its size. """ self._create_file(self, SKIPPED_MESSAGE) def _create_file_hidden(self, path): """Create a file that indicates an hidden content Create the given file but fill it with a specific content to - indicates that the content could not be retrieved due to + indicate that the content could not be retrieved due to privacy policy. """ self._create_file(self, HIDDEN_MESSAGE) def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: - a path to the newly created archive file. + bytes that represent the compressed directory as a bundle. """ tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(path.decode(), arcname=hex_dir_id) return tar_buffer.getbuffer() - - def _cache_bundle(self, dir_id, bundle_content): - self.cache.add('directory', dir_id, bundle_content) - - def _notify_bundle_ready(self, bundle_id): - # TODO plug this method with the notification method once - # done. - pass diff --git a/version.txt b/version.txt index 2b2961717..b1cfcabec 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.60-0-g189e9c1 \ No newline at end of file +v0.0.61-0-g4c3623c \ No newline at end of file