diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -11,6 +11,7 @@ python3-retrying, python3-setuptools, python3-swh.core (>= 0.0.7~), + python3-swh.loader.core (>= 0.0.22), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), @@ -21,6 +22,7 @@ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), + python3-swh.loader.core (>= 0.0.22~), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,5 @@ swh.core >= 0.0.7 +swh.loader.core >= 0.0.22 swh.model >= 0.0.15 swh.scheduler >= 0.0.14 swh.storage >= 0.0.83 diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py --- a/swh/loader/git/base.py +++ b/swh/loader/git/base.py @@ -3,72 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import logging +import abc import os -import traceback -import uuid -import psycopg2 -import requests -from retrying import retry +from swh.loader.core.loader import SWHLoader -from swh.core import config -from swh.storage import get_storage - -def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): - """Send `objects`, using the `sender`, in packets of `packet_size` objects (and - of max `packet_size_bytes`). - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in objects: - if not obj: - continue - formatted_objects.append(obj) - if packet_size_bytes: - count += obj['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - - -def retry_loading(error): - """Retry policy when we catch a recoverable error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - logger = logging.getLogger('swh.loader.git.BulkLoader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class BaseLoader(config.SWHConfig): +class BaseLoader(SWHLoader): """This base class is a pattern for loaders. The external calling convention is as such: @@ -91,9 +32,6 @@ - eventful returns whether the load was eventful or not """ - - CONFIG_BASE_FILENAME = None - DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', @@ -118,11 +56,8 @@ 'occurrence_packet_size': ('int', 100000), } - ADDITIONAL_CONFIG = {} - def __init__(self): - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) + super().__init__(logging_class='swh.loader.git.BulkLoader') # Make sure the config is sane if self.config['save_data']: @@ -131,51 +66,35 @@ if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) - self.storage = get_storage(**self.config['storage']) - - self.log = logging.getLogger('swh.loader.git.BulkLoader') - self.fetch_date = None # possibly overridden in self.prepare method - - def prepare(self, *args, **kwargs): - """Prepare the data source to be loaded""" - raise NotImplementedError - - def cleanup(self): - """Clean up an eventual state installed for computations.""" - pass - - def get_origin(self): - """Get the origin that is currently being loaded""" - raise NotImplementedError - - def fetch_data(self): - """Fetch the data from the data source""" - raise NotImplementedError + self.visit_date = None # possibly overridden in self.prepare method + @abc.abstractmethod def has_contents(self): """Checks whether we need to load contents""" - return True + pass def get_contents(self): """Get the contents that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_directories(self): """Checks whether we need to load directories""" - return True + pass def get_directories(self): """Get the directories that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_revisions(self): """Checks whether we need to load revisions""" - return True def get_revisions(self): """Get the revisions that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_releases(self): """Checks whether we need to load releases""" return True @@ -184,9 +103,10 @@ """Get the releases that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_occurrences(self): """Checks whether we need to load occurrences""" - return True + pass def get_occurrences(self): """Get the occurrences that need to be loaded""" @@ -208,7 +128,7 @@ """The path to which we save the data""" if not hasattr(self, '__save_data_path'): origin_id = self.origin_id - year = str(self.fetch_date.year) + year = str(self.visit_date.year) path = os.path.join( self.config['save_data_path'], @@ -222,234 +142,27 @@ return self.__save_data_path - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): - """Actually send properly formatted contents to the database""" - num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): - """Actually send properly formatted directories to the database""" - num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): - """Actually send properly formatted revisions to the database""" - num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): - """Actually send properly formatted releases to the database""" - num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_occurrences(self, occurrence_list): - """Actually send properly formatted occurrences to the database""" - num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - - def send_origin(self, origin): - log_id = str(uuid.uuid4()) - self.log.debug('Creating %s origin for %s' % (origin['type'], - origin['url']), - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - origin_id = self.storage.origin_add_one(origin) - self.log.debug('Done creating %s origin for %s' % (origin['type'], - origin['url']), - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - - return origin_id - - def send_all_contents(self, contents): - """Send all the contents to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - - send_in_packets(contents, self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes) - - def send_all_directories(self, directories): - """Send all the directories to the database""" - packet_size = self.config['directory_packet_size'] - - send_in_packets(directories, self.send_directories, packet_size) - - def send_all_revisions(self, revisions): - """Send all the revisions to the database""" - packet_size = self.config['revision_packet_size'] - - send_in_packets(revisions, self.send_revisions, packet_size) - - def send_all_releases(self, releases): - """Send all the releases to the database - """ - packet_size = self.config['release_packet_size'] - - send_in_packets(releases, self.send_releases, packet_size) + def cleanup(self): + """Clean up an eventual state installed for computations. + Nothing specific for the loader-git is needed. - def send_all_occurrences(self, occurrences): - """Send all the occurrences to the database """ - packet_size = self.config['occurrence_packet_size'] - - send_in_packets(occurrences, self.send_occurrences, packet_size) - - def open_fetch_history(self): - return self.storage.fetch_history_start(self.origin_id) - - def close_fetch_history_success(self, fetch_history_id, result): - data = { - 'status': True, - 'result': result, - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def close_fetch_history_failure(self, fetch_history_id): - import traceback - data = { - 'status': False, - 'stderr': traceback.format_exc(), - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def load(self, *args, **kwargs): - - self.prepare(*args, **kwargs) - origin = self.get_origin() - self.origin_id = self.send_origin(origin) - - fetch_history_id = self.open_fetch_history() - if self.fetch_date: # overwriting the visit_date the fetching date - date_visit = self.fetch_date - else: - date_visit = datetime.datetime.now(tz=datetime.timezone.utc) - - origin_visit = self.storage.origin_visit_add( - self.origin_id, - date_visit) - self.visit = origin_visit['visit'] - - try: - self.fetch_data() - - if self.config['save_data']: - self.save_data() - - if self.config['send_contents'] and self.has_contents(): - self.send_all_contents(self.get_contents()) - - if self.config['send_directories'] and self.has_directories(): - self.send_all_directories(self.get_directories()) - - if self.config['send_revisions'] and self.has_revisions(): - self.send_all_revisions(self.get_revisions()) - - if self.config['send_releases'] and self.has_releases(): - self.send_all_releases(self.get_releases()) - - if self.config['send_occurrences'] and self.has_occurrences(): - self.send_all_occurrences(self.get_occurrences()) - - self.close_fetch_history_success(fetch_history_id, - self.get_fetch_history_result()) - self.storage.origin_visit_update( - self.origin_id, self.visit, status='full') + pass - except: - self.close_fetch_history_failure(fetch_history_id) - self.storage.origin_visit_update( - self.origin_id, self.visit, status='partial') - raise - finally: - self.cleanup() + def store_data(self): + """Store data fetched from the git repository. - return self.eventful() + """ + if self.config['save_data']: + self.save_data() + + if self.config['send_contents'] and self.has_contents(): + self.send_batch_contents(self.get_contents()) + if self.config['send_directories'] and self.has_directories(): + self.send_batch_directories(self.get_directories()) + if self.config['send_revisions'] and self.has_revisions(): + self.send_batch_revisions(self.get_revisions()) + if self.config['send_releases'] and self.has_releases(): + self.send_batch_releases(self.get_releases()) + if self.config['send_occurrences'] and self.has_occurrences(): + self.send_batch_occurrences(self.get_occurrences()) diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -20,10 +20,11 @@ CONFIG_BASE_FILENAME = 'loader/git-loader' - def prepare(self, origin_url, directory, fetch_date): + def prepare(self, origin_url, directory, visit_date): self.origin_url = origin_url + self.origin = self.get_origin() self.repo = dulwich.repo.Repo(directory) - self.fetch_date = fetch_date + self.visit_date = visit_date def get_origin(self): """Get the origin that is currently being loaded""" @@ -57,7 +58,6 @@ def get_content_ids(self): """Get the content identifiers from the git repository""" for oid in self.type_to_ids[b'blob']: - yield converters.dulwich_blob_to_content_id(self.repo[oid]) def get_contents(self): @@ -177,7 +177,7 @@ """ return os.path.basename(os.path.dirname(archive_path)) - def prepare(self, origin_url, archive_path, fetch_date): + def prepare(self, origin_url, archive_path, visit_date): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoader does 3. Load as GitLoader does @@ -189,7 +189,7 @@ self.log.info('Project %s - Uncompressing archive %s at %s' % ( origin_url, os.path.basename(archive_path), self.repo_path)) - super().prepare(origin_url, self.repo_path, fetch_date) + super().prepare(origin_url, self.repo_path, visit_date) def cleanup(self): """Cleanup the temporary location (if it exists). @@ -213,6 +213,6 @@ origin_url = sys.argv[1] directory = sys.argv[2] - fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - print(loader.load(origin_url, directory, fetch_date)) + print(loader.load(origin_url, directory, visit_date)) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -238,7 +238,7 @@ return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): - self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) @@ -304,8 +304,8 @@ write_size = 8192 pack_dir = self.get_save_data_path() - pack_name = "%s.pack" % self.fetch_date.isoformat() - refs_name = "%s.refs" % self.fetch_date.isoformat() + pack_name = "%s.pack" % self.visit_date.isoformat() + refs_name = "%s.refs" % self.visit_date.isoformat() with open(os.path.join(pack_dir, pack_name), 'xb') as f: while True: