diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index ef3ad4c..35ce577 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,983 +1,981 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging import os import psycopg2 import requests import traceback import uuid from abc import ABCMeta, abstractmethod from retrying import retry from typing import Any, Dict, Optional, Tuple from . import converters from swh.core import config from swh.storage import get_storage, HashCollision from .queue import QueuePerSizeAndNbUniqueElements from .queue import QueuePerNbUniqueElements def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): """Send `objects`, using the `sender`, in packets of `packet_size` objects (and of max `packet_size_bytes`). """ formatted_objects = [] count = 0 if not packet_size_bytes: packet_size_bytes = 0 for obj in objects: if not obj: continue formatted_objects.append(obj) if packet_size_bytes: count += obj['length'] if len(formatted_objects) >= packet_size or count > packet_size_bytes: sender(formatted_objects) formatted_objects = [] count = 0 if formatted_objects: sender(formatted_objects) def retry_loading(error): """Retry policy when the database raises an integrity error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, HashCollision, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class BufferedLoader(config.SWHConfig, metaclass=ABCMeta): """Mixin base class for loader. To use this class, you must: - inherit from this class - and implement the @abstractmethod methods: - :func:`prepare`: First step executed by the loader to prepare some state needed by the `func`:load method. - :func:`get_origin`: Retrieve the origin that is currently being loaded. - :func:`fetch_data`: Fetch the data is actually the method to implement to compute data to inject in swh (through the store_data method) - :func:`store_data`: Store data fetched. - :func:`visit_status`: Explicit status of the visit ('partial' or 'full') - :func:`load_status`: Explicit status of the loading, for use by the scheduler (eventful/uneventful/temporary failure/permanent failure). - :func:`cleanup`: Last step executed by the loader. The entry point for the resulting loader is :func:`load`. You can take a look at some example classes: - :class:`BaseSvnLoader` - :class:`TarLoader` - :class:`DirLoader` - :class:`DebianLoader` """ CONFIG_BASE_FILENAME = None # type: Optional[str] DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_snapshot': ('bool', True), 'save_data': ('bool', False), 'save_data_path': ('str', ''), # Number of contents 'content_packet_size': ('int', 10000), # packet of 100Mib contents 'content_packet_size_bytes': ('int', 100 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } # type: Dict[str, Tuple[str, Any]] ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] def __init__(self, logging_class=None, config=None): if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.storage = get_storage(**self.config['storage']) if logging_class is None: logging_class = '%s.%s' % (self.__class__.__module__, self.__class__.__name__) self.log = logging.getLogger(logging_class) self.contents = QueuePerSizeAndNbUniqueElements( key='sha1', max_nb_elements=self.config['content_packet_size'], max_size=self.config['content_packet_size_bytes']) self.contents_seen = set() self.directories = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['directory_packet_size']) self.directories_seen = set() self.revisions = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['revision_packet_size']) self.revisions_seen = set() self.releases = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['release_packet_size']) self.releases_seen = set() self.snapshot = None _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) self.counters = { 'contents': 0, 'directories': 0, 'revisions': 0, 'releases': 0, } # Make sure the config is sane save_data = self.config.get('save_data') if save_data: path = self.config['save_data_path'] os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) def save_data(self): """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self): """The path to which we archive the loader's raw data""" if not hasattr(self, '__save_data_path'): year = str(self.visit_date.year) url = self.origin['url'].encode('utf-8') origin_url_hash = hashlib.sha1(url).hexdigest() path = '%s/sha1:%s/%s/%s' % ( self.config['save_data_path'], origin_url_hash[0:2], origin_url_hash, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin(self, origin): log_id = str(uuid.uuid4()) - self.log.debug('Creating %s origin for %s' % (origin['type'], - origin['url']), + self.log.debug('Creating origin for %s' % origin['url'], extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) self.storage.origin_add_one(origin) - self.log.debug('Done creating %s origin for %s' % (origin['type'], - origin['url']), + self.log.debug('Done creating origin for %s' % origin['url'], extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin_visit(self, visit_date, visit_type): log_id = str(uuid.uuid4()) self.log.debug( 'Creating origin_visit for origin %s at time %s' % ( self.origin['url'], visit_date), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) origin_visit = self.storage.origin_visit_add( self.origin['url'], visit_date, visit_type) self.log.debug( 'Done Creating %s origin_visit for origin %s at time %s' % ( visit_type, self.origin['url'], visit_date), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) return origin_visit @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_tool(self, tool): log_id = str(uuid.uuid4()) self.log.debug( 'Creating tool with name %s version %s configuration %s' % ( tool['name'], tool['version'], tool['configuration']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'tool', 'swh_num': 1, 'swh_id': log_id }) tools = self.storage.tool_add([tool]) tool_id = tools[0]['id'] self.log.debug( 'Done creating tool with name %s version %s and configuration %s' % ( # noqa tool['name'], tool['version'], tool['configuration']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'tool', 'swh_num': 1, 'swh_id': log_id }) return tool_id @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_provider(self, provider): log_id = str(uuid.uuid4()) self.log.debug( 'Creating metadata_provider with name %s type %s url %s' % ( provider['provider_name'], provider['provider_type'], provider['provider_url']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'metadata_provider', 'swh_num': 1, 'swh_id': log_id }) # FIXME: align metadata_provider_add with indexer_configuration_add _provider = self.storage.metadata_provider_get_by(provider) if _provider and 'id' in _provider: provider_id = _provider['id'] else: provider_id = self.storage.metadata_provider_add( provider['provider_name'], provider['provider_type'], provider['provider_url'], provider['metadata']) self.log.debug( 'Done creating metadata_provider with name %s type %s url %s' % ( provider['provider_name'], provider['provider_type'], provider['provider_url']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'metadata_provider', 'swh_num': 1, 'swh_id': log_id }) return provider_id @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin_metadata(self, visit_date, provider_id, tool_id, metadata): log_id = str(uuid.uuid4()) self.log.debug( 'Creating origin_metadata for origin %s at time %s with provider_id %s and tool_id %s' % ( # noqa self.origin['url'], visit_date, provider_id, tool_id), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_metadata', 'swh_num': 1, 'swh_id': log_id }) self.storage.origin_metadata_add( self.origin['url'], visit_date, provider_id, tool_id, metadata) self.log.debug( 'Done Creating origin_metadata for origin %s at time %s with provider %s and tool %s' % ( # noqa self.origin['url'], visit_date, provider_id, tool_id), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_metadata', 'swh_num': 1, 'swh_id': log_id }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def update_origin_visit(self, status): log_id = str(uuid.uuid4()) self.log.debug( 'Updating origin_visit for origin %s with status %s' % ( self.origin['url'], status), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) self.storage.origin_visit_update( self.origin['url'], self.visit, status) self.log.debug( 'Done updating origin_visit for origin %s with status %s' % ( self.origin['url'], status), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database. """ num_contents = len(content_list) if num_contents > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) result = self.storage.content_add(content_list) self.counters['contents'] += result['content:add'] self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database. """ num_directories = len(directory_list) if num_directories > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) result = self.storage.directory_add(directory_list) self.counters['directories'] += result['directory:add'] self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database. """ num_revisions = len(revision_list) if num_revisions > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) result = self.storage.revision_add(revision_list) self.counters['revisions'] += result['revision:add'] self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database. """ num_releases = len(release_list) if num_releases > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) result = self.storage.release_add(release_list) self.counters['releases'] += result['release:add'] self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_snapshot(self, snapshot): self.storage.snapshot_add([snapshot]) self.storage.origin_visit_update( self.origin['url'], self.visit, snapshot=snapshot['id']) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def filter_missing_contents(self, contents): """Return only the contents missing from swh""" max_content_size = self.config['content_size_limit'] contents_per_key = {} content_key = 'blake2s256' for content in contents: if content[content_key] in self.contents_seen: continue key = content[content_key] contents_per_key[key] = content self.contents_seen.add(key) for key in self.storage.content_missing( list(contents_per_key.values()), key_hash=content_key ): yield converters.content_for_storage( contents_per_key[key], max_content_size=max_content_size, origin_url=self.origin['url'], ) def bulk_send_contents(self, contents): """Format contents as swh contents and send them to the database. """ threshold_reached = self.contents.add( self.filter_missing_contents(contents)) if threshold_reached: self.send_batch_contents(self.contents.pop()) def filter_missing_directories(self, directories): """Return only directories missing from swh""" directories_per_id = {} search_dirs = [] for directory in directories: dir_id = directory['id'] if dir_id in self.directories_seen: continue search_dirs.append(dir_id) directories_per_id[dir_id] = directory self.directories_seen.add(dir_id) for dir_id in self.storage.directory_missing(search_dirs): yield directories_per_id[dir_id] def bulk_send_directories(self, directories): """Send missing directories to the database""" threshold_reached = self.directories.add( self.filter_missing_directories(directories)) if threshold_reached: self.send_batch_contents(self.contents.pop()) self.send_batch_directories(self.directories.pop()) def filter_missing_revisions(self, revisions): """Return only revisions missing from swh""" revisions_per_id = {} search_revs = [] for revision in revisions: rev_id = revision['id'] if rev_id in self.revisions_seen: continue search_revs.append(rev_id) revisions_per_id[rev_id] = revision self.revisions_seen.add(rev_id) for rev_id in self.storage.revision_missing(search_revs): yield revisions_per_id[rev_id] def bulk_send_revisions(self, revisions): """Send missing revisions to the database""" threshold_reached = self.revisions.add( self.filter_missing_revisions(revisions)) if threshold_reached: self.send_batch_contents(self.contents.pop()) self.send_batch_directories(self.directories.pop()) self.send_batch_revisions(self.revisions.pop()) def filter_missing_releases(self, releases): """Return only releases missing from swh""" releases_per_id = {} search_rels = [] for release in releases: rel_id = release['id'] if rel_id in self.releases_seen: continue search_rels.append(rel_id) releases_per_id[rel_id] = release self.releases_seen.add(rel_id) for rel_id in self.storage.release_missing(search_rels): yield releases_per_id[rel_id] def bulk_send_releases(self, releases): """Send missing releases to the database""" threshold_reached = self.releases.add( self.filter_missing_releases(releases)) if threshold_reached: self.send_batch_contents(self.contents.pop()) self.send_batch_directories(self.directories.pop()) self.send_batch_revisions(self.revisions.pop()) self.send_batch_releases(self.releases.pop()) def bulk_send_snapshot(self, snapshot): """Send missing releases to the database""" self.send_batch_contents(self.contents.pop()) self.send_batch_directories(self.directories.pop()) self.send_batch_revisions(self.revisions.pop()) self.send_batch_releases(self.releases.pop()) self.send_snapshot(snapshot) def maybe_load_contents(self, contents): """Load contents in swh-storage if need be. """ if self.config['send_contents']: self.bulk_send_contents(contents) def maybe_load_directories(self, directories): """Load directories in swh-storage if need be. """ if self.config['send_directories']: self.bulk_send_directories(directories) def maybe_load_revisions(self, revisions): """Load revisions in swh-storage if need be. """ if self.config['send_revisions']: self.bulk_send_revisions(revisions) def maybe_load_releases(self, releases): """Load releases in swh-storage if need be. """ if self.config['send_releases']: self.bulk_send_releases(releases) def maybe_load_snapshot(self, snapshot): """Load the snapshot in swh-storage if need be.""" if self.config['send_snapshot']: self.bulk_send_snapshot(snapshot) def send_batch_contents(self, contents): """Send contents batches to the storage""" packet_size = self.config['content_packet_size'] packet_size_bytes = self.config['content_packet_size_bytes'] send_in_packets(contents, self.send_contents, packet_size, packet_size_bytes=packet_size_bytes) def send_batch_directories(self, directories): """Send directories batches to the storage""" packet_size = self.config['directory_packet_size'] send_in_packets(directories, self.send_directories, packet_size) def send_batch_revisions(self, revisions): """Send revisions batches to the storage""" packet_size = self.config['revision_packet_size'] send_in_packets(revisions, self.send_revisions, packet_size) def send_batch_releases(self, releases): """Send releases batches to the storage """ packet_size = self.config['release_packet_size'] send_in_packets(releases, self.send_releases, packet_size) def flush(self): """Flush any potential dangling data not sent to swh-storage. Bypass the maybe_load_* methods which awaits threshold reached signal. We actually want to store those as we are done loading. """ contents = self.contents.pop() directories = self.directories.pop() revisions = self.revisions.pop() releases = self.releases.pop() # and send those to storage if asked if self.config['send_contents']: self.send_batch_contents(contents) if self.config['send_contents']: self.send_batch_directories(directories) if self.config['send_revisions']: self.send_batch_revisions(revisions) if self.config['send_releases']: self.send_batch_releases(releases) if self.config['send_snapshot'] and self.snapshot: self.send_snapshot(self.snapshot) def prepare_metadata(self): """First step for origin_metadata insertion, resolving the provider_id and the tool_id by fetching data from the storage or creating tool and provider on the fly if the data isn't available """ origin_metadata = self.origin_metadata tool = origin_metadata['tool'] try: tool_id = self.send_tool(tool) self.origin_metadata['tool']['tool_id'] = tool_id except Exception: self.log.exception('Problem when storing new tool') raise provider = origin_metadata['provider'] try: provider_id = self.send_provider(provider) self.origin_metadata['provider']['provider_id'] = provider_id except Exception: self.log.exception('Problem when storing new provider') raise @abstractmethod def cleanup(self): """Last step executed by the loader. """ pass @abstractmethod def prepare_origin_visit(self, *args, **kwargs): """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ pass def _store_origin_visit(self): """Store origin and visit references. Sets the self.origin_visit and self.visit references. """ origin = self.origin.copy() if 'id' in origin: # TODO: remove the condition when we finished migrating away # from origin ids del origin['id'] self.send_origin(origin) if not self.visit_date: # now as default visit_date if not provided self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.origin_visit = self.send_origin_visit( self.visit_date, self.visit_type) self.visit = self.origin_visit['visit'] @abstractmethod def prepare(self, *args, **kwargs): """Second step executed by the loader to prepare some state needed by the loader. """ pass def get_origin(self): """Get the origin that is currently being loaded. self.origin should be set in :func:`prepare_origin` Returns: dict: an origin ready to be sent to storage by :func:`origin_add_one`. """ return self.origin @abstractmethod def fetch_data(self): """Fetch the data from the source the loader is currently loading (ex: git/hg/svn/... repository). Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ pass @abstractmethod def store_data(self): """Store fetched data in the database. Should call the :func:`maybe_load_xyz` methods, which handle the bundles sent to storage, rather than send directly. """ pass def store_metadata(self): """Store fetched metadata in the database. For more information, see implementation in :class:`DepositLoader`. """ pass def load_status(self): """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { 'status': 'eventful', } def post_load(self, success=True): """Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading's status. Defaults to doing nothing. This is up to the implementer of this method to make sure this does not break. Args: success (bool): the success status of the loading """ pass def visit_status(self): """Detailed visit status. Defaults to logging a full visit. """ return 'full' def pre_cleanup(self): """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ pass def load(self, *args, **kwargs): r"""Loading logic for the loader to follow: - 1. Call :meth:`prepare_origin_visit` to prepare the origin and visit we will associate loading data to - 2. Store the actual ``origin_visit`` to storage - 3. Call :meth:`prepare` to prepare any eventual state - 4. Call :meth:`get_origin` to get the origin we work with and store - while True: - 5. Call :meth:`fetch_data` to fetch the data to store - 6. Call :meth:`store_data` to store the data - 7. Call :meth:`cleanup` to clean up any eventual state put in place in :meth:`prepare` method. """ try: self.pre_cleanup() except Exception: msg = 'Cleaning up dangling data failed! Continue loading.' self.log.warning(msg) self.prepare_origin_visit(*args, **kwargs) self._store_origin_visit() try: self.prepare(*args, **kwargs) while True: more_data_to_fetch = self.fetch_data() self.store_data() if not more_data_to_fetch: break self.store_metadata() self.update_origin_visit(status=self.visit_status()) self.post_load() except Exception: self.log.exception('Loading failure, updating to `partial` status', extra={ 'swh_task_args': args, 'swh_task_kwargs': kwargs, }) self.update_origin_visit(status='partial') self.post_load(success=False) return {'status': 'failed'} finally: self.flush() self.cleanup() return self.load_status() class UnbufferedLoader(BufferedLoader): """This base class is a pattern for unbuffered loaders. UnbufferedLoader loaders are able to load all the data in one go. For example, the loader defined in swh-loader-git :class:`BulkUpdater`. For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), inherit directly from :class:`BufferedLoader`. """ ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] def __init__(self, logging_class=None, config=None): super().__init__(logging_class=logging_class, config=config) self.visit_date = None # possibly overridden in self.prepare method def cleanup(self): """Clean up an eventual state installed for computations.""" pass def has_contents(self): """Checks whether we need to load contents""" return True def get_contents(self): """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self): """Checks whether we need to load directories""" return True def get_directories(self): """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self): """Checks whether we need to load revisions""" return True def get_revisions(self): """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self): """Checks whether we need to load releases""" return True def get_releases(self): """Get the releases that need to be loaded""" raise NotImplementedError def get_snapshot(self): """Get the snapshot that needs to be loaded""" raise NotImplementedError def eventful(self): """Whether the load was eventful""" raise NotImplementedError def save_data(self): """Save the data associated to the current load""" raise NotImplementedError def flush(self): """Unbuffered loader does not flush since it has no state to flush. """ pass def store_data(self): if self.config['save_data']: self.save_data() if self.config['send_contents'] and self.has_contents(): self.send_batch_contents(self.get_contents()) if self.config['send_directories'] and self.has_directories(): self.send_batch_directories(self.get_directories()) if self.config['send_revisions'] and self.has_revisions(): self.send_batch_revisions(self.get_revisions()) if self.config['send_releases'] and self.has_releases(): self.send_batch_releases(self.get_releases()) if self.config['send_snapshot']: self.send_snapshot(self.get_snapshot()) diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py index 0b93864..a2e0d6a 100644 --- a/swh/loader/core/tests/__init__.py +++ b/swh/loader/core/tests/__init__.py @@ -1,220 +1,218 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import shutil import subprocess import tempfile from unittest import TestCase from swh.model import hashutil from swh.model.hashutil import hash_to_bytes class BaseLoaderStorageTest: def _assertCountEqual(self, type, expected_length, msg=None): """Check typed 'type' state to have the same expected length. """ self.storage.refresh_stat_counters() self.assertEqual(self.storage.stat_counters()[type], expected_length, msg=msg) def assertCountContents(self, len_expected_contents, msg=None): self._assertCountEqual('content', len_expected_contents, msg=msg) def assertCountDirectories(self, len_expected_directories, msg=None): self._assertCountEqual('directory', len_expected_directories, msg=msg) def assertCountReleases(self, len_expected_releases, msg=None): self._assertCountEqual('release', len_expected_releases, msg=msg) def assertCountRevisions(self, len_expected_revisions, msg=None): self._assertCountEqual('revision', len_expected_revisions, msg=msg) def assertCountSnapshots(self, len_expected_snapshot, msg=None): self._assertCountEqual('snapshot', len_expected_snapshot, msg=msg) def assertContentsContain(self, expected_contents): """Check the provided content are a subset of the stored ones. Args: expected_contents ([sha1]): List of content ids""" missing = list(self.storage.content_missing( {'sha1': hash_to_bytes(content_hash)} for content_hash in expected_contents)) self.assertEqual(missing, []) def assertDirectoriesContain(self, expected_directories): """Check the provided directories are a subset of the stored ones. Args: expected_directories ([sha1]): List of directory ids.""" missing = list(self.storage.directory_missing( hash_to_bytes(dir_) for dir_ in expected_directories)) self.assertEqual(missing, []) def assertReleasesContain(self, expected_releases): """Check the provided releases are a subset of the stored ones. Args: releases (list): list of swh releases' identifiers. """ missing = list(self.storage.release_missing( hash_to_bytes(rel) for rel in expected_releases)) self.assertEqual(missing, []) def assertRevisionsContain(self, expected_revisions): """Check the provided revisions are a subset of the stored ones. Expects self.loader to be instantiated and ready to be inspected (meaning the loading took place). Args: expected_revisions (dict): Dict with key revision id, value the targeted directory id. """ revs = list(self.storage.revision_get( hashutil.hash_to_bytes(rev_id) for rev_id in expected_revisions)) self.assertNotIn(None, revs) self.assertEqual( {rev['id']: rev['directory'] for rev in revs}, {hash_to_bytes(rev_id): hash_to_bytes(rev_dir) for (rev_id, rev_dir) in expected_revisions.items()}) def assertSnapshotEqual(self, expected_snapshot, expected_branches=[]): """Check for snapshot match. Provide the hashes as hexadecimal, the conversion is done within the method. Args: expected_snapshot (str/dict): Either the snapshot identifier or the full snapshot expected_branches (dict): expected branches or nothing is the full snapshot is provided """ if isinstance(expected_snapshot, dict) and not expected_branches: expected_snapshot_id = expected_snapshot['id'] expected_branches = expected_snapshot['branches'] else: expected_snapshot_id = expected_snapshot snap = self.storage.snapshot_get(hash_to_bytes(expected_snapshot_id)) self.assertIsNotNone(snap) def decode_target(target): if not target: return target target_type = target['target_type'] if target_type == 'alias': decoded_target = target['target'].decode('utf-8') else: decoded_target = hashutil.hash_to_hex(target['target']) return { 'target': decoded_target, 'target_type': target_type } branches = { branch.decode('utf-8'): decode_target(target) for branch, target in snap['branches'].items() } self.assertEqual(expected_branches, branches) - def assertOriginMetadataContains(self, origin_type, origin_url, + def assertOriginMetadataContains(self, origin_url, expected_origin_metadata): """Check the storage contains this metadata for the given origin. Args: - origin_type (str): type of origin ('deposit', 'git', 'svn', ...) origin_url (str): URL of the origin expected_origin_metadata (dict): Extrinsic metadata of the origin """ - origin = self.storage.origin_get( - dict(type=origin_type, url=origin_url)) + origin = self.storage.origin_get({'url': origin_url}) results = self.storage.origin_metadata_get_by(origin['url']) self.assertEqual(len(results), 1, results) result = results[0] self.assertEqual(result['metadata'], expected_origin_metadata) @pytest.mark.fs class BaseLoaderTest(TestCase, BaseLoaderStorageTest): """Mixin base loader test class. This allows to uncompress archives (mercurial, svn, git, ... repositories) into a temporary folder so that the loader under test can work with this. When setUp() is done, the following variables are defined: - self.repo_url: can be used as an origin_url for example - self.destination_path: can be used as a path to ingest the repository. Args: archive_name (str): Name of the archive holding the repository (folder, repository, dump, etc...) start_path (str): (mandatory) Path from where starting to look for resources filename (Optional[str]): Name of the filename/folder once the archive is uncompressed. When the filename is not provided, the archive name is used as a derivative. This is used both for the self.repo_url and self.destination_path computation (this one only when provided) resources_path (str): Folder name to look for archive prefix_tmp_folder_name (str): Prefix name to name the temporary folder uncompress_archive (bool): Uncompress the archive passed as parameters (default to True). It so happens we could avoid doing anything to the tarball. """ def setUp(self, archive_name, *, start_path, filename=None, resources_path='resources', prefix_tmp_folder_name='', uncompress_archive=True): super().setUp() repo_path = os.path.join(start_path, resources_path, archive_name) if not uncompress_archive: # In that case, simply sets the archive's path self.destination_path = repo_path self.tmp_root_path = None self.repo_url = 'file://' + repo_path return tmp_root_path = tempfile.mkdtemp( prefix=prefix_tmp_folder_name, suffix='-tests') # uncompress folder/repositories/dump for the loader to ingest subprocess.check_output(['tar', 'xf', repo_path, '-C', tmp_root_path]) # build the origin url (or some derivative form) _fname = filename if filename else os.path.basename(archive_name) self.repo_url = 'file://' + tmp_root_path + '/' + _fname # where is the data to ingest? if filename: # archive holds one folder with name self.destination_path = os.path.join(tmp_root_path, filename) else: self.destination_path = tmp_root_path self.tmp_root_path = tmp_root_path def tearDown(self): """Clean up temporary working directory """ if self.tmp_root_path and os.path.exists(self.tmp_root_path): shutil.rmtree(self.tmp_root_path) diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py index 10617b9..f258925 100644 --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -1,369 +1,368 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging import pytest from swh.model.hashutil import hash_to_bytes from swh.loader.core.loader import BufferedLoader, UnbufferedLoader from . import BaseLoaderTest class DummyLoader: def cleanup(self): pass def prepare(self, *args, **kwargs): pass def fetch_data(self): pass def store_data(self): pass def prepare_origin_visit(self, *args, **kwargs): origin = self.storage.origin_get( self._test_prepare_origin_visit_data['origin']) self.origin = origin self.origin_url = origin['url'] self.visit_date = datetime.datetime.utcnow() - self.storage.origin_visit_add(origin['id'], self.visit_date) + self.visit_type = 'git' + self.storage.origin_visit_add(origin['id'], self.visit_date, + self.visit_type) def parse_config_file(self, *args, **kwargs): return { 'storage': { 'cls': 'memory', 'args': { } }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_packet_size': 2, 'content_packet_size_bytes': 8, 'directory_packet_size': 2, 'revision_packet_size': 2, 'release_packet_size': 2, 'content_size_limit': 10000, } class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader): pass class DummyBufferedLoader(DummyLoader, BufferedLoader): pass class DummyBaseLoaderTest(BaseLoaderTest): def setUp(self): self.loader = self.loader_class(logging_class='dummyloader') self.loader.visit_type = 'git' # do not call voluntarily super().setUp() self.storage = self.loader.storage contents = [ { 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', 'sha1_git': b'bar1', 'sha256': b'baz1', 'blake2s256': b'qux1', 'status': 'visible', 'data': b'data1', 'length': 5, }, { 'sha1': '61c2b3a30496d329e21af70dd2d7e097046d07b7', 'sha1_git': b'bar2', 'sha256': b'baz2', 'blake2s256': b'qux2', 'status': 'visible', 'data': b'data2', 'length': 5, }, ] self.expected_contents = [content['sha1'] for content in contents] self.in_contents = contents.copy() for content in self.in_contents: content['sha1'] = hash_to_bytes(content['sha1']) self.in_directories = [ {'id': hash_to_bytes(id_), 'entries': []} for id_ in [ '44e45d56f88993aae6a0198013efa80716fd8921', '54e45d56f88993aae6a0198013efa80716fd8920', '43e45d56f88993aae6a0198013efa80716fd8920', ] ] person = { 'name': b'John Doe', 'email': b'john.doe@institute.org', 'fullname': b'John Doe ' } rev1_id = b'\x00'*20 rev2_id = b'\x01'*20 self.in_revisions = [ { 'id': rev1_id, 'type': 'git', 'date': 1567591673, 'committer_date': 1567591673, 'author': person, 'committer': person, 'message': b'msg1', 'directory': None, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': rev2_id, 'type': 'hg', 'date': 1567591673, 'committer_date': 1567591673, 'author': person, 'committer': person, 'message': b'msg2', 'directory': None, 'synthetic': False, 'metadata': None, 'parents': [], }, ] self.in_releases = [ { 'name': b'rel1', 'id': b'\x02'*20, 'date': None, 'author': person, 'target_type': 'revision', 'target': rev1_id, 'message': None, 'synthetic': False, }, { 'name': b'rel2', 'id': b'\x03'*20, 'date': None, 'author': person, 'target_type': 'revision', 'target': rev2_id, 'message': None, 'synthetic': False, }, ] self.in_origin = { 'type': self.loader.visit_type, 'url': 'http://example.com/', } self.in_snapshot = { 'id': b'snap1', 'branches': {}, } self.in_provider = { 'provider_name': 'Test Provider', 'provider_type': 'test_provider', 'provider_url': 'http://example.org/metadata_provider', 'metadata': {'working': True}, } self.in_tool = { 'name': 'Test Tool', 'version': 'v1.2.3', 'configuration': {'in_the_Matrix': 'maybe'}, } self.storage.origin_add([self.in_origin]) # used by prepare_origin_visit() when it gets called self.loader._test_prepare_origin_visit_data = { 'origin': self.in_origin, } def tearDown(self): # do not call voluntarily super().tearDown() pass class CoreUnbufferedLoaderTest(DummyBaseLoaderTest): loader_class = DummyUnbufferedLoader def test_unbuffered_loader(self): self.loader.load() # initialize the loader self.loader.send_contents(self.in_contents[0:1]) self.loader.send_directories(self.in_directories[0:1]) self.loader.send_revisions(self.in_revisions[0:1]) self.loader.send_releases(self.in_releases[0:1]) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(1) self.assertCountReleases(1) self.loader.send_contents(self.in_contents[1:]) self.loader.send_directories(self.in_directories[1:]) self.loader.send_revisions(self.in_revisions[1:]) self.loader.send_releases(self.in_releases[1:]) self.assertCountContents(len(self.in_contents)) self.assertCountDirectories(len(self.in_directories)) self.assertCountRevisions(len(self.in_revisions)) self.assertCountReleases(len(self.in_releases)) class CoreBufferedLoaderTest(DummyBaseLoaderTest): loader_class = DummyBufferedLoader def test_buffered_loader(self): self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions[0:1]) self.loader.maybe_load_releases(self.in_releases[0:1]) self.assertCountContents(0) self.assertCountDirectories(0) self.assertCountRevisions(0) self.assertCountReleases(0) self.loader.maybe_load_contents(self.in_contents[1:]) self.loader.maybe_load_directories(self.in_directories[1:]) self.loader.maybe_load_revisions(self.in_revisions[1:]) self.loader.maybe_load_releases(self.in_releases[1:]) self.assertCountContents(len(self.in_contents)) self.assertCountDirectories(len(self.in_directories)) self.assertCountRevisions(len(self.in_revisions)) self.assertCountReleases(len(self.in_releases)) def test_directory_cascade(self): """Checks that sending a directory triggers sending contents""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories) self.assertCountContents(1) self.assertCountDirectories(len(self.in_directories)) def test_revision_cascade(self): """Checks that sending a revision triggers sending contents and directories.""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(len(self.in_revisions)) def test_release_cascade(self): """Checks that sending a release triggers sending revisions, contents, and directories.""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions[0:1]) self.loader.maybe_load_releases(self.in_releases) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(1) self.assertCountReleases(len(self.in_releases)) def test_snapshot_cascade(self): """Checks that sending a snapshot triggers sending releases, revisions, contents, and directories.""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions[0:1]) self.loader.maybe_load_releases(self.in_releases[0:1]) self.loader.maybe_load_snapshot(self.in_snapshot) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(1) self.assertCountReleases(1) self.assertCountSnapshots(1) def test_origin_metadata(self): self.loader.load() provider_id = self.loader.send_provider(self.in_provider) tool_id = self.loader.send_tool(self.in_tool) self.loader.send_origin_metadata( self.loader.visit_date, provider_id, tool_id, {'test_metadata': 'foobar'}) - self.assertOriginMetadataContains( - self.in_origin['type'], self.in_origin['url'], - {'test_metadata': 'foobar'}) + self.assertOriginMetadataContains(self.in_origin['url'], + {'test_metadata': 'foobar'}) with self.assertRaises(AssertionError): - self.assertOriginMetadataContains( - self.in_origin['type'], self.in_origin['url'], - {'test_metadata': 'foobarbaz'}) + self.assertOriginMetadataContains(self.in_origin['url'], + {'test_metadata': 'foobarbaz'}) with self.assertRaises(Exception): - self.assertOriginMetadataContains( - self.in_origin['type'], self.in_origin['url'] + 'blah', - {'test_metadata': 'foobar'}) + self.assertOriginMetadataContains(self.in_origin['url'] + 'blah', + {'test_metadata': 'foobar'}) def test_loader_logger_default_name(): loader = DummyBufferedLoader() assert isinstance(loader.log, logging.Logger) assert loader.log.name == \ 'swh.loader.core.tests.test_loader.DummyBufferedLoader' loader = DummyUnbufferedLoader() assert isinstance(loader.log, logging.Logger) assert loader.log.name == \ 'swh.loader.core.tests.test_loader.DummyUnbufferedLoader' def test_loader_logger_with_name(): loader = DummyBufferedLoader('some.logger.name') assert isinstance(loader.log, logging.Logger) assert loader.log.name == \ 'some.logger.name' @pytest.mark.fs def test_loader_save_data_path(tmp_path): loader = DummyBufferedLoader('some.logger.name.1') url = 'http://bitbucket.org/something' loader.origin = { 'url': url, } loader.visit_date = datetime.datetime(year=2019, month=10, day=1) loader.config = { 'save_data_path': tmp_path, } hash_url = hashlib.sha1(url.encode('utf-8')).hexdigest() expected_save_path = '%s/sha1:%s/%s/2019' % ( str(tmp_path), hash_url[0:2], hash_url ) save_path = loader.get_save_data_path() assert save_path == expected_save_path