diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index 1ebaed9..2f72dd2 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,682 +1,700 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import psycopg2 import requests import traceback import uuid from abc import ABCMeta, abstractmethod from retrying import retry from . import converters from swh.core import config from swh.storage import get_storage from .queue import QueuePerSizeAndNbUniqueElements from .queue import QueuePerNbUniqueElements from .queue import QueuePerNbElements def retry_loading(error): """Retry policy when the database raises an integrity error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class SWHLoader(config.SWHConfig, metaclass=ABCMeta): """Mixin base class for loader. To use this class, you must: - inherit from this class - and implement the @abstractmethod methods: - :func:`prepare`: First step executed by the loader to prepare some state needed by the `func`:load method. - :func:`get_origin`: Retrieve the origin that is currently being loaded. - :func:`fetch_data`: Fetch the data is actually the method to implement to compute data to inject in swh (through the store_data method) - :func:`store_data`: Store data fetched. - :func:`visit_status`: Explicit status of the visit ('partial' or 'full') - :func:`load_status`: Explicit status of the loading, for use by the scheduler (eventful/uneventful/temporary failure/permanent failure). - :func:`cleanup`: Last step executed by the loader. The entry point for the resulting loader is :func:`load`. You can take a look at some example classes: - :class:`BaseSvnLoader` - :class:`TarLoader` - :class:`DirLoader` - :class:`DebianLoader` """ CONFIG_BASE_FILENAME = None DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), # Number of contents 'content_packet_size': ('int', 10000), # If this size threshold is reached, the content is condidered missing # in swh-storage 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), # packet of 100Mib contents 'content_packet_block_size_bytes': ('int', 100 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } ADDITIONAL_CONFIG = {} def __init__(self, logging_class, config=None): if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.storage = get_storage(**self.config['storage']) self.log = logging.getLogger(logging_class) self.contents = QueuePerSizeAndNbUniqueElements( key='sha1', max_nb_elements=self.config['content_packet_size'], max_size=self.config['content_packet_block_size_bytes']) self.contents_seen = set() self.directories = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['directory_packet_size']) self.directories_seen = set() self.revisions = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['revision_packet_size']) self.revisions_seen = set() self.releases = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['release_packet_size']) self.releases_seen = set() self.occurrences = QueuePerNbElements( self.config['occurrence_packet_size']) l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.counters = { 'contents': 0, 'directories': 0, 'revisions': 0, 'releases': 0, 'occurrences': 0, } @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin(self, origin): log_id = str(uuid.uuid4()) self.log.debug('Creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin_id = self.storage.origin_add_one(origin) self.log.debug('Done creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin_id @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin_visit(self, origin_id, visit_date): log_id = str(uuid.uuid4()) self.log.debug( 'Creating origin_visit for origin %s at time %s' % ( origin_id, visit_date), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.log.debug( 'Done Creating origin_visit for origin %s at time %s' % ( origin_id, visit_date), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) return origin_visit @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def update_origin_visit(self, origin_id, visit, status): log_id = str(uuid.uuid4()) self.log.debug( 'Updating origin_visit for origin %s with status %s' % ( origin_id, status), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) self.storage.origin_visit_update(origin_id, visit, status) self.log.debug( 'Done updating origin_visit for origin %s with status %s' % ( origin_id, status), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database. """ num_contents = len(content_list) if num_contents > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.counters['contents'] += num_contents self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database. """ num_directories = len(directory_list) if num_directories > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.counters['directories'] += num_directories self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database. """ num_revisions = len(revision_list) if num_revisions > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.counters['revisions'] += num_revisions self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database. """ num_releases = len(release_list) if num_releases > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.counters['releases'] += num_releases self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database. """ num_occurrences = len(occurrence_list) if num_occurrences > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.counters['occurrences'] += num_occurrences self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) def filter_missing_contents(self, contents): """Return only the contents missing from swh""" max_content_size = self.config['content_packet_size_bytes'] contents_per_key = {} content_key = 'blake2s256' for content in contents: if content[content_key] in self.contents_seen: continue key = content[content_key] contents_per_key[key] = content self.contents_seen.add(key) for key in self.storage.content_missing( list(contents_per_key.values()), key_hash=content_key ): yield converters.content_for_storage( contents_per_key[key], max_content_size=max_content_size, origin_id=self.origin_id, ) def bulk_send_contents(self, contents): """Format contents as swh contents and send them to the database. """ threshold_reached = self.contents.add( self.filter_missing_contents(contents)) if threshold_reached: self.send_contents(self.contents.pop()) def filter_missing_directories(self, directories): """Return only directories missing from swh""" directories_per_id = {} search_dirs = [] for directory in directories: dir_id = directory['id'] if dir_id in self.directories_seen: continue search_dirs.append(dir_id) directories_per_id[dir_id] = directory self.directories_seen.add(dir_id) for dir_id in self.storage.directory_missing(search_dirs): yield directories_per_id[dir_id] def bulk_send_directories(self, directories): """Send missing directories to the database""" threshold_reached = self.directories.add( self.filter_missing_directories(directories)) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) def filter_missing_revisions(self, revisions): """Return only revisions missing from swh""" revisions_per_id = {} search_revs = [] for revision in revisions: rev_id = revision['id'] if rev_id in self.revisions_seen: continue search_revs.append(rev_id) revisions_per_id[rev_id] = revision self.revisions_seen.add(rev_id) for rev_id in self.storage.revision_missing(search_revs): yield revisions_per_id[rev_id] def bulk_send_revisions(self, revisions): """Send missing revisions to the database""" threshold_reached = self.revisions.add( self.filter_missing_revisions(revisions)) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) self.send_revisions(self.revisions.pop()) def filter_missing_releases(self, releases): """Return only releases missing from swh""" releases_per_id = {} search_rels = [] for release in releases: rel_id = release['id'] if rel_id in self.releases_seen: continue search_rels.append(rel_id) releases_per_id[rel_id] = release self.releases_seen.add(rel_id) for rel_id in self.storage.release_missing(search_rels): yield releases_per_id[rel_id] def bulk_send_releases(self, releases): """Send missing releases to the database""" threshold_reached = self.releases.add( self.filter_missing_releases(releases)) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) self.send_revisions(self.revisions.pop()) self.send_releases(self.releases.pop()) def bulk_send_occurrences(self, occurrences): """Send the occurrences to the SWH archive""" threshold_reached = self.occurrences.add(occurrences) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) self.send_revisions(self.revisions.pop()) self.send_releases(self.releases.pop()) self.send_occurrences(self.occurrences.pop()) def maybe_load_contents(self, contents): """Load contents in swh-storage if need be. """ if self.config['send_contents']: self.bulk_send_contents(contents) def maybe_load_directories(self, directories): """Load directories in swh-storage if need be. """ if self.config['send_directories']: self.bulk_send_directories(directories) def maybe_load_revisions(self, revisions): """Load revisions in swh-storage if need be. """ if self.config['send_revisions']: self.bulk_send_revisions(revisions) def maybe_load_releases(self, releases): """Load releases in swh-storage if need be. """ if self.config['send_releases']: self.bulk_send_releases(releases) def maybe_load_occurrences(self, occurrences): """Load occurrences in swh-storage if need be. """ if self.config['send_occurrences']: self.bulk_send_occurrences(occurrences) def open_fetch_history(self): return self.storage.fetch_history_start(self.origin_id) def close_fetch_history_success(self, fetch_history_id): data = { 'status': True, 'result': self.counters, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } if self.counters['contents'] > 0 or \ self.counters['directories'] or \ self.counters['revisions'] > 0 or \ self.counters['releases'] > 0 or \ self.counters['occurrences'] > 0: data['result'] = self.counters return self.storage.fetch_history_end(fetch_history_id, data) def flush(self): """Flush any potential dangling data not sent to swh-storage. """ contents = self.contents.pop() directories = self.directories.pop() revisions = self.revisions.pop() occurrences = self.occurrences.pop() releases = self.releases.pop() # and send those to storage if asked if self.config['send_contents']: self.send_contents(contents) if self.config['send_directories']: self.send_directories(directories) if self.config['send_revisions']: self.send_revisions(revisions) if self.config['send_occurrences']: self.send_occurrences(occurrences) if self.config['send_releases']: self.send_releases(releases) @abstractmethod def cleanup(self): """Last step executed by the loader. """ pass @abstractmethod def prepare(self, *args, **kwargs): """First step executed by the loader to prepare some state needed by the loader. """ pass @abstractmethod def get_origin(self): """Get the origin that is currently being loaded. Returns: dict: an origin ready to be sent to storage by :func:`origin_add_one`. """ pass @abstractmethod def fetch_data(self): """Fetch the data we want to store. Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ pass @abstractmethod def store_data(self): """Store the data we fetched in the database. Should call the :func:`maybe_load_xyz` methods, which handle the bundles sent to storage, rather than send directly. """ pass def load_status(self): """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { 'status': 'eventful', } + def post_load(self, success=True): + """Permit the loader to do some additional actions according to status + after the loading is done. The flag success indicates the + loading's status. + + Defaults to doing nothing. + + This is up to the implementer of this method to make sure this + does not break. + + Args: + success (bool): the success status of the loading + + """ + pass + def visit_status(self): """Detailed visit status. Defaults to logging a full visit. """ return 'full' def load(self, *args, **kwargs): """Loading logic for the loader to follow: - 1. def prepare(\*args, \**kwargs): Prepare any eventual state - 2. def get_origin(): Get the origin we work with and store - while True: - 3. def fetch_data(): Fetch the data to store - 4. def store_data(): Store the data - 5. def cleanup(): Clean up any eventual state put in place in prepare method. """ self.prepare(*args, **kwargs) origin = self.get_origin() self.origin_id = self.send_origin(origin) fetch_history_id = self.open_fetch_history() if self.visit_date: # overwriting the visit_date if provided visit_date = self.visit_date else: visit_date = datetime.datetime.now(tz=datetime.timezone.utc) origin_visit = self.send_origin_visit( self.origin_id, visit_date) self.visit = origin_visit['visit'] try: while True: more_data_to_fetch = self.fetch_data() self.store_data() if not more_data_to_fetch: break self.close_fetch_history_success(fetch_history_id) self.update_origin_visit( self.origin_id, self.visit, status=self.visit_status()) + self.post_load() except Exception: self.log.exception('Loading failure, updating to `partial` status') self.close_fetch_history_failure(fetch_history_id) self.update_origin_visit( self.origin_id, self.visit, status='partial') + self.post_load(success=False) return {'status': 'failed'} finally: self.flush() self.cleanup() return self.load_status()