diff --git a/debian/control b/debian/control index 651bd31..d6e888d 100644 --- a/debian/control +++ b/debian/control @@ -1,25 +1,26 @@ Source: swh-loader-dir Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, python3-setuptools, python3-swh.core (>= 0.0.14), python3-swh.model (>= 0.0.4), python3-swh.scheduler, python3-swh.storage (>= 0.0.31), + python3-swh.loader.core, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDDIR/ Package: python3-swh.loader.dir Architecture: all Depends: python3-swh.core (>= 0.0.14), python3-swh.model (>= 0.0.4), python3-swh.storage (>= 0.0.31), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Directory Loader diff --git a/requirements.txt b/requirements.txt index e2b17fa..b6498ff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner swh.core >= 0.0.14 swh.model >= 0.0.4 swh.scheduler swh.storage >= 0.0.31 +swh.loader.core retrying diff --git a/swh/loader/dir/converters.py b/swh/loader/dir/converters.py deleted file mode 100644 index 33572f2..0000000 --- a/swh/loader/dir/converters.py +++ /dev/null @@ -1,180 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Convert dir objects to dictionaries suitable for swh.storage""" - -import datetime -import os - -from swh.model.hashutil import hash_to_hex - -from swh.model import git - - -def to_datetime(ts): - """Convert a timestamp to utc datetime. - - """ - return datetime.datetime.utcfromtimestamp(ts).replace( - tzinfo=datetime.timezone.utc) - - -def format_to_minutes(offset_str): - """Convert a git string timezone format string (e.g +0200, -0310) to minutes. - - Args: - offset_str: a string representing an offset. - - Returns: - A positive or negative number of minutes of such input - - """ - sign = offset_str[0] - hours = int(offset_str[1:3]) - minutes = int(offset_str[3:]) + (hours * 60) - return minutes if sign == '+' else -1 * minutes - - -def blob_to_content(obj, log=None, max_content_size=None, - origin_id=None): - """Convert obj to a swh storage content. - - Note: - - If obj represents a link, the length and data are already - provided so we use them directly. - - 'data' is returned only if max_content_size is not reached. - - Returns: - obj converted to content as a dictionary. - - """ - filepath = obj['path'] - if 'length' in obj: # link already has it - size = obj['length'] - else: - size = os.lstat(filepath).st_size - - ret = { - 'sha1': obj['sha1'], - 'sha256': obj['sha256'], - 'sha1_git': obj['sha1_git'], - 'length': size, - 'perms': obj['perms'].value, - 'type': obj['type'].value, - } - - if max_content_size and size > max_content_size: - if log: - log.info('Skipping content %s, too large (%s > %s)' % - (hash_to_hex(obj['sha1_git']), - size, - max_content_size)) - ret.update({'status': 'absent', - 'reason': 'Content too large', - 'origin': origin_id}) - return ret - - if 'data' in obj: # link already has it - data = obj['data'] - else: - data = open(filepath, 'rb').read() - - ret.update({ - 'data': data, - 'status': 'visible' - }) - - return ret - - -# Map of type to swh types -_entry_type_map = { - git.GitType.TREE: 'dir', - git.GitType.BLOB: 'file', - git.GitType.COMM: 'rev', -} - - -def tree_to_directory(tree, objects, log=None): - """Format a tree as a directory - - """ - entries = [] - for entry in objects[tree['path']]: - entries.append({ - 'type': _entry_type_map[entry['type']], - 'perms': int(entry['perms'].value), - 'name': entry['name'], - 'target': entry['sha1_git'] - }) - - return { - 'id': tree['sha1_git'], - 'entries': entries - } - - -def commit_to_revision(commit, objects, log=None): - """Format a commit as a revision. - - """ - upper_directory = objects[git.ROOT_TREE_KEY][0] - return { - 'date': { - 'timestamp': commit['author_date'], - 'offset': format_to_minutes(commit['author_offset']), - }, - 'committer_date': { - 'timestamp': commit['committer_date'], - 'offset': format_to_minutes(commit['committer_offset']), - }, - 'type': commit['type'], - 'directory': upper_directory['sha1_git'], - 'message': commit['message'].encode('utf-8'), - 'author': { - 'name': commit['author_name'].encode('utf-8'), - 'email': commit['author_email'].encode('utf-8'), - }, - 'committer': { - 'name': commit['committer_name'].encode('utf-8'), - 'email': commit['committer_email'].encode('utf-8'), - }, - 'synthetic': True, - 'metadata': commit['metadata'], - 'parents': [], - } - - -def annotated_tag_to_release(release, log=None): - """Format a swh release. - - """ - return { - 'target': release['target'], - 'target_type': release['target_type'], - 'name': release['name'].encode('utf-8'), - 'message': release['comment'].encode('utf-8'), - 'date': { - 'timestamp': release['date'], - 'offset': format_to_minutes(release['offset']), - }, - 'author': { - 'name': release['author_name'].encode('utf-8'), - 'email': release['author_email'].encode('utf-8'), - }, - 'synthetic': True, - } - - -def ref_to_occurrence(ref): - """Format a reference as an occurrence""" - occ = ref.copy() - if 'branch' in ref: - branch = ref['branch'] - if isinstance(branch, str): - occ['branch'] = branch.encode('utf-8') - else: - occ['branch'] = branch - return occ diff --git a/swh/loader/dir/loader.py b/swh/loader/dir/loader.py index 59b439b..57d4cc7 100644 --- a/swh/loader/dir/loader.py +++ b/swh/loader/dir/loader.py @@ -1,544 +1,196 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging import os -import psycopg2 -import requests import sys -import traceback import uuid -from retrying import retry - -from swh.core import config - -from swh.loader.dir import converters +from swh.loader.core import loader, converters from swh.model import git from swh.model.git import GitType -def send_in_packets(source_list, formatter, sender, packet_size, - packet_size_bytes=None, *args, **kwargs): - """Send objects from `source_list`, passed through `formatter` (with - extra args *args, **kwargs), using the `sender`, in packets of - `packet_size` objects (and of max `packet_size_bytes`). - - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in source_list: - formatted_object = formatter(obj, *args, **kwargs) - if formatted_object: - formatted_objects.append(formatted_object) - else: - continue - if packet_size_bytes: - count += formatted_object['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - - -def retry_loading(error): - """Retry policy when the database raises an integrity error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - # FIXME: it could be DirLoaderWithHistory, TarLoader - logger = logging.getLogger('swh.loader.dir.DirLoader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class DirLoader(config.SWHConfig): +class DirLoader(loader.SWHLoader): """A bulk loader for a directory. This will load the content of the directory. """ - DEFAULT_CONFIG = { - 'storage_class': ('str', 'remote_storage'), - 'storage_args': ('list[str]', ['http://localhost:5000/']), - - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), - - 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), - } - - def __init__(self, config): - self.config = config - - if self.config['storage_class'] == 'remote_storage': - from swh.storage.api.client import RemoteStorage as Storage - else: - from swh.storage import Storage - - self.storage = Storage(*self.config['storage_args']) - - self.log = logging.getLogger('swh.loader.dir.DirLoader') - - def open_fetch_history(self, origin_id): - return self.storage.fetch_history_start(origin_id) - - def close_fetch_history(self, fetch_history_id, res): - result = None - if 'objects' in res: - result = { - 'contents': len(res['objects'].get(GitType.BLOB, [])), - 'directories': len(res['objects'].get(GitType.TREE, [])), - 'revisions': len(res['objects'].get(GitType.COMM, [])), - 'releases': len(res['objects'].get(GitType.RELE, [])), - 'occurrences': len(res['objects'].get(GitType.REFS, [])), - } - - data = { - 'status': res['status'], - 'result': result, - 'stderr': res.get('stderr') - } - return self.storage.fetch_history_end(fetch_history_id, data) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): - """Actually send properly formatted contents to the database""" - num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): - """Actually send properly formatted directories to the database""" - num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): - """Actually send properly formatted revisions to the database""" - num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): - """Actually send properly formatted releases to the database""" - num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_occurrences(self, occurrence_list): - """Actually send properly formatted occurrences to the database""" - num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - - def bulk_send_blobs(self, objects, blobs, origin_id): - """Format blobs as swh contents and send them to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - max_content_size = self.config['content_size_limit'] - - send_in_packets(blobs, converters.blob_to_content, - self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes, - log=self.log, - max_content_size=max_content_size, - origin_id=origin_id) - - def bulk_send_trees(self, objects, trees): - """Format trees as swh directories and send them to the database""" - packet_size = self.config['directory_packet_size'] - - send_in_packets(trees, converters.tree_to_directory, - self.send_directories, packet_size, - objects=objects, - log=self.log) - - def bulk_send_commits(self, objects, commits): - """Format commits as swh revisions and send them to the database""" - packet_size = self.config['revision_packet_size'] - - send_in_packets(commits, (lambda x, objects={}, log=None: x), - self.send_revisions, packet_size, - objects=objects, - log=self.log) - - def bulk_send_annotated_tags(self, objects, tags): - """Format annotated tags (pygit2.Tag objects) as swh releases and send - them to the database - """ - packet_size = self.config['release_packet_size'] - - send_in_packets(tags, (lambda x, objects={}, log=None: x), - self.send_releases, packet_size, - log=self.log) - - def bulk_send_refs(self, objects, refs): - """Format git references as swh occurrences and send them to the - database - """ - packet_size = self.config['occurrence_packet_size'] - send_in_packets(refs, converters.ref_to_occurrence, - self.send_occurrences, packet_size) + def __init__(self, config, + origin_id, + revision_type='dir', + logging_class='swh.loader.dir.DirLoader'): + super().__init__(config, origin_id, revision_type, logging_class) def list_repo_objs(self, dir_path, revision, release): """List all objects from dir_path. Args: - dir_path (path): the directory to list - revision: revision dictionary representation - release: release dictionary representation Returns: a dict containing lists of `Oid`s with keys for each object type: - CONTENT - DIRECTORY """ def get_objects_per_object_type(objects_per_path): m = { GitType.BLOB: [], GitType.TREE: [], GitType.COMM: [], GitType.RELE: [] } for tree_path in objects_per_path: objs = objects_per_path[tree_path] for obj in objs: m[obj['type']].append(obj) return m def _revision_from(tree_hash, revision, objects): full_rev = dict(revision) full_rev['directory'] = tree_hash full_rev = converters.commit_to_revision(full_rev, objects) full_rev['id'] = git.compute_revision_sha1_git(full_rev) return full_rev def _release_from(revision_hash, release): full_rel = dict(release) full_rel['target'] = revision_hash full_rel['target_type'] = 'revision' full_rel = converters.annotated_tag_to_release(full_rel) full_rel['id'] = git.compute_release_sha1_git(full_rel) return full_rel log_id = str(uuid.uuid4()) sdir_path = dir_path.decode('utf-8') self.log.info("Started listing %s" % dir_path, extra={ 'swh_type': 'dir_list_objs_start', 'swh_repo': sdir_path, 'swh_id': log_id, }) objects_per_path = git.walk_and_compute_sha1_from_directory(dir_path) objects = get_objects_per_object_type(objects_per_path) tree_hash = objects_per_path[git.ROOT_TREE_KEY][0]['sha1_git'] full_rev = _revision_from(tree_hash, revision, objects_per_path) objects[GitType.COMM] = [full_rev] if release and 'name' in release: full_rel = _release_from(full_rev['id'], release) objects[GitType.RELE] = [full_rel] self.log.info("Done listing the objects in %s: %d contents, " "%d directories, %d revisions, %d releases" % ( sdir_path, len(objects[GitType.BLOB]), len(objects[GitType.TREE]), len(objects[GitType.COMM]), len(objects[GitType.RELE]) ), extra={ 'swh_type': 'dir_list_objs_end', 'swh_repo': sdir_path, 'swh_num_blobs': len(objects[GitType.BLOB]), 'swh_num_trees': len(objects[GitType.TREE]), 'swh_num_commits': len(objects[GitType.COMM]), 'swh_num_releases': len(objects[GitType.RELE]), 'swh_id': log_id, }) return objects, objects_per_path - def load_dir(self, dir_path, objects, objects_per_path, refs, origin_id): - if self.config['send_contents']: - self.bulk_send_blobs(objects_per_path, objects[GitType.BLOB], - origin_id) - else: - self.log.info('Not sending contents') - - if self.config['send_directories']: - self.bulk_send_trees(objects_per_path, objects[GitType.TREE]) - else: - self.log.info('Not sending directories') - - if self.config['send_revisions']: - self.bulk_send_commits(objects_per_path, objects[GitType.COMM]) - else: - self.log.info('Not sending revisions') - - if self.config['send_releases']: - self.bulk_send_annotated_tags(objects_per_path, - objects[GitType.RELE]) - else: - self.log.info('Not sending releases') - - if self.config['send_occurrences']: - self.bulk_send_refs(objects_per_path, refs) - else: - self.log.info('Not sending occurrences') - def process(self, dir_path, origin, revision, release, occurrences): """Load a directory in backend. Args: - dir_path: source of the directory to import - origin: Dictionary origin - id: origin's id - url: url origin we fetched - type: type of the origin - revision: Dictionary of information needed, keys are: - author_name: revision's author name - author_email: revision's author email - author_date: timestamp (e.g. 1444054085) - author_offset: date offset e.g. -0220, +0100 - committer_name: revision's committer name - committer_email: revision's committer email - committer_date: timestamp - committer_offset: date offset e.g. -0220, +0100 - type: type of revision dir, tar - message: synthetic message for the revision - release: Dictionary of information needed, keys are: - name: release name - date: release timestamp (e.g. 1444054085) - offset: release date offset e.g. -0220, +0100 - author_name: release author's name - author_email: release author's email - comment: release's comment message - occurrences: List of occurrences as dictionary. Information needed, keys are: - branch: occurrence's branch name - date: validity date (e.g. 2015-01-01 00:00:00+00) Returns: Dictionary with the following keys: - status: mandatory, the status result as a boolean - stderr: optional when status is True, mandatory otherwise - objects: the actual objects sent to swh storage """ def _occurrence_from(origin_id, revision_hash, occurrence): occ = dict(occurrence) occ.update({ 'target': revision_hash, 'target_type': 'revision', 'origin': origin_id, }) return occ def _occurrences_from(origin_id, revision_hash, occurrences): - full_occs = [] + occs = [] for occurrence in occurrences: - full_occ = _occurrence_from(origin_id, - revision_hash, - occurrence) - full_occs.append(full_occ) - return full_occs + occs.append(_occurrence_from(origin_id, + revision_hash, + occurrence)) + + return occs if not os.path.exists(dir_path): warn_msg = 'Skipping inexistant directory %s' % dir_path self.log.warn(warn_msg, extra={ 'swh_type': 'dir_repo_list_refs', 'swh_repo': dir_path, 'swh_num_refs': 0, }) return {'status': False, 'stderr': warn_msg} if isinstance(dir_path, str): dir_path = dir_path.encode(sys.getfilesystemencoding()) # to load the repository, walk all objects, compute their hash objects, objects_per_path = self.list_repo_objs(dir_path, revision, release) full_rev = objects[GitType.COMM][0] # only 1 revision - full_occs = _occurrences_from(origin['id'], - full_rev['id'], - occurrences) + # Update objects with release and occurrences + objects[GitType.RELE] = [full_rev] + objects[GitType.REFS] = _occurrences_from(origin['id'], + full_rev['id'], + occurrences) - self.load_dir(dir_path, objects, objects_per_path, full_occs, - origin['id']) - - objects[GitType.REFS] = full_occs + self.load(objects, objects_per_path) + self.flush() return {'status': True, 'objects': objects} - - -class DirLoaderWithHistory(DirLoader): - """A bulk loader for a directory. - - This will: - - create the origin if it does not exist - - open an entry in fetch_history - - load the content of the directory - - close the entry in fetch_history - - """ - def __init__(self, config): - super().__init__(config) - self.log = logging.getLogger('swh.loader.dir.DirLoaderWithHistory') - - def process(self, dir_path, origin, revision, release, occurrences): - """Load a directory in backend. - - Args: - - dir_path: source of the directory to import - - origin: Dictionary origin - - url: url origin we fetched - - type: type of the origin - - revision: Dictionary of information needed, keys are: - - author_name: revision's author name - - author_email: revision's author email - - author_date: timestamp (e.g. 1444054085) - - author_offset: date offset e.g. -0220, +0100 - - committer_name: revision's committer name - - committer_email: revision's committer email - - committer_date: timestamp - - committer_offset: date offset e.g. -0220, +0100 - - type: type of revision dir, tar - - message: synthetic message for the revision - - release: Dictionary of information needed, keys are: - - name: release name - - date: release timestamp (e.g. 1444054085) - - offset: release date offset e.g. -0220, +0100 - - author_name: release author's name - - author_email: release author's email - - comment: release's comment message - - occurrences: List of occurrence dictionary. - Information needed, keys are: - - branch: occurrence's branch name - - date: validity date (e.g. 2015-01-01 00:00:00+00) - - """ - origin['id'] = self.storage.origin_add_one(origin) - - fetch_history_id = self.open_fetch_history(origin['id']) - - result = super().process(dir_path, origin, revision, release, - occurrences) - - self.close_fetch_history(fetch_history_id, result) diff --git a/swh/loader/dir/tasks.py b/swh/loader/dir/tasks.py index 7f46076..db2f998 100644 --- a/swh/loader/dir/tasks.py +++ b/swh/loader/dir/tasks.py @@ -1,35 +1,66 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.task import Task +from swh.core.config import load_named_config +from swh.loader.dir.loader import DirLoader +from swh.loader.core import tasks +from swh.storage import get_storage -from swh.loader.dir.loader import DirLoaderWithHistory +DEFAULT_CONFIG = { + 'storage_class': ('str', 'remote_storage'), + 'storage_args': ('list[str]', ['http://localhost:5000/']), + 'send_contents': ('bool', True), + 'send_directories': ('bool', True), + 'send_revisions': ('bool', True), + 'send_releases': ('bool', True), + 'send_occurrences': ('bool', True), + 'content_packet_size': ('int', 10000), + 'content_packet_size': ('int', 10000), + 'content_packet_block_size_bytes': ('int', 100 * 1024 * 1024), + 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), + 'directory_packet_size': ('int', 25000), + 'revision_packet_size': ('int', 100000), + 'release_packet_size': ('int', 100000), + 'occurrence_packet_size': ('int', 100000), + } -class LoadDirRepository(Task): + +class LoadDirRepository(tasks.LoaderCoreTask): """Import a directory to Software Heritage """ task_queue = 'swh_loader_dir' - CONFIG_BASE_FILENAME = 'loader/dir.ini' - ADDITIONAL_CONFIG = {} + @property + def config(self): + if not hasattr(self, '__config'): + self.__config = load_named_config( + 'loader/svn.ini', + DEFAULT_CONFIG) - def __init__(self): - self.config = DirLoaderWithHistory.parse_config_file( - base_filename=self.CONFIG_BASE_FILENAME, - additional_configs=[self.ADDITIONAL_CONFIG], - ) + return self.__config def run(self, dir_path, origin, revision, release, occurrences): """Import a directory. Args: cf. swh.loader.dir.loader.run docstring """ - loader = DirLoaderWithHistory(self.config) - loader.log = self.log - loader.process(dir_path, origin, revision, release, occurrences) + config = self.config + storage = get_storage(config['storage_class'], config['storage_args']) + + origin['id'] = storage.origin_add_one(origin) + + fetch_history_id = self.open_fetch_history(storage, origin['id']) + + result = DirLoader(config, origin['id']).process(dir_path, + origin, + revision, + release, + occurrences) + + self.close_fetch_history(storage, fetch_history_id, result) diff --git a/swh/loader/dir/tests/test_converters.py b/swh/loader/dir/tests/test_converters.py deleted file mode 100644 index 0643953..0000000 --- a/swh/loader/dir/tests/test_converters.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import os -import shutil -import tempfile -import unittest - -from nose.tools import istest - -from swh.loader.dir import converters -from swh.model import git - - -def tmpfile_with_content(fromdir, contentfile): - """Create a temporary file with content contentfile in directory fromdir. - - """ - tmpfilepath = tempfile.mktemp( - suffix='.swh', - prefix='tmp-file-for-test', - dir=fromdir) - - with open(tmpfilepath, 'wb') as f: - f.write(contentfile) - - return tmpfilepath - - -class TestConverters(unittest.TestCase): - - @classmethod - def setUpClass(cls): - super().setUpClass() - cls.tmpdir = tempfile.mkdtemp(prefix='test-swh-loader-dir.') - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmpdir) - super().tearDownClass() - - @istest - def format_to_minutes(self): - self.assertEquals(converters.format_to_minutes('+0100'), 60) - self.assertEquals(converters.format_to_minutes('-0200'), -120) - self.assertEquals(converters.format_to_minutes('+1250'), 12*60+50) - self.assertEquals(converters.format_to_minutes('+0000'), 0) - self.assertEquals(converters.format_to_minutes('-0000'), 0) - - @istest - def annotated_tag_to_release(self): - # given - release = { - 'id': '123', - 'target': '456', - 'target_type': 'revision', - 'name': 'some-release', - 'comment': 'some-comment-on-release', - 'date': 1444054085, - 'offset': '-0300', - 'author_name': 'someone', - 'author_email': 'someone@whatelse.eu', - } - - expected_release = { - 'target': '456', - 'target_type': 'revision', - 'name': b'some-release', - 'message': b'some-comment-on-release', - 'date': { - 'timestamp': 1444054085, - 'offset': -180 - }, - 'author': { - 'name': b'someone', - 'email': b'someone@whatelse.eu', - }, - 'synthetic': True, - } - - # when - actual_release = converters.annotated_tag_to_release(release) - - # then - self.assertDictEqual(actual_release, expected_release) - - @istest - def blob_to_content_visible_data(self): - # given - contentfile = b'temp file for testing blob to content conversion' - tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) - - obj = { - 'path': tmpfilepath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - } - - expected_blob = { - 'data': contentfile, - 'length': len(contentfile), - 'status': 'visible', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - } - - # when - actual_blob = converters.blob_to_content(obj) - - # then - self.assertEqual(actual_blob, expected_blob) - - @istest - def blob_to_content_link(self): - # given - contentfile = b'temp file for testing blob to content conversion' - tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) - tmplinkpath = tempfile.mktemp(dir=self.tmpdir) - os.symlink(tmpfilepath, tmplinkpath) - - obj = { - 'path': tmplinkpath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - } - - expected_blob = { - 'data': contentfile, - 'length': len(tmpfilepath), - 'status': 'visible', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - } - - # when - actual_blob = converters.blob_to_content(obj) - - # then - self.assertEqual(actual_blob, expected_blob) - - @istest - def blob_to_content_link_with_data_length_populated(self): - # given - tmplinkpath = tempfile.mktemp(dir=self.tmpdir) - obj = { - 'length': 10, # wrong for test purposes - 'data': 'something wrong', # again for test purposes - 'path': tmplinkpath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - } - - expected_blob = { - 'length': 10, - 'data': 'something wrong', - 'status': 'visible', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - } - - # when - actual_blob = converters.blob_to_content(obj) - - # then - self.assertEqual(actual_blob, expected_blob) - - @istest - def blob_to_content2_absent_data(self): - # given - contentfile = b'temp file for testing blob to content conversion' - tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) - - obj = { - 'path': tmpfilepath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - } - - expected_blob = { - 'length': len(contentfile), - 'status': 'absent', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - 'reason': 'Content too large', - 'origin': 190 - } - - # when - actual_blob = converters.blob_to_content(obj, None, - max_content_size=10, - origin_id=190) - - # then - self.assertEqual(actual_blob, expected_blob) - - @istest - def tree_to_directory_no_entries(self): - # given - tree = { - 'path': 'foo', - 'sha1_git': b'tree_sha1_git' - } - objects = { - 'foo': [{'type': git.GitType.TREE, - 'perms': git.GitPerm.TREE, - 'name': 'bar', - 'sha1_git': b'sha1-target'}, - {'type': git.GitType.BLOB, - 'perms': git.GitPerm.BLOB, - 'name': 'file-foo', - 'sha1_git': b'file-foo-sha1-target'}] - } - - expected_directory = { - 'id': b'tree_sha1_git', - 'entries': [{'type': 'dir', - 'perms': int(git.GitPerm.TREE.value), - 'name': 'bar', - 'target': b'sha1-target'}, - {'type': 'file', - 'perms': int(git.GitPerm.BLOB.value), - 'name': 'file-foo', - 'target': b'file-foo-sha1-target'}] - } - - # when - actual_directory = converters.tree_to_directory(tree, objects) - - # then - self.assertEqual(actual_directory, expected_directory) - - @istest - def commit_to_revision(self): - # given - commit = { - 'sha1_git': 'commit-git-sha1', - 'author_date': 1444054085, - 'author_offset': '+0000', - 'committer_date': 1444054085, - 'committer_offset': '-0000', - 'type': 'tar', - 'message': 'synthetic-message-input', - 'author_name': 'author-name', - 'author_email': 'author-email', - 'committer_name': 'committer-name', - 'committer_email': 'committer-email', - 'metadata': {'checksums': {'sha1': b'sha1-as-bytes'}}, - 'directory': 'targeted-tree-sha1', - } - - objects = { - git.ROOT_TREE_KEY: [{'sha1_git': 'targeted-tree-sha1'}] - } - - expected_revision = { - 'date': { - 'timestamp': 1444054085, - 'offset': 0, - }, - 'committer_date': { - 'timestamp': 1444054085, - 'offset': 0, - }, - 'type': 'tar', - 'directory': 'targeted-tree-sha1', - 'message': b'synthetic-message-input', - 'author': { - 'name': b'author-name', - 'email': b'author-email', - }, - 'committer': { - 'name': b'committer-name', - 'email': b'committer-email', - }, - 'synthetic': True, - 'metadata': {'checksums': {'sha1': b'sha1-as-bytes'}}, - 'parents': [], - } - - # when - actual_revision = converters.commit_to_revision(commit, objects) - - # then - self.assertEquals(actual_revision, expected_revision) - - @istest - def ref_to_occurrence_1(self): - # when - actual_occ = converters.ref_to_occurrence({ - 'id': 'some-id', - 'branch': 'some/branch' - }) - # then - self.assertEquals(actual_occ, { - 'id': 'some-id', - 'branch': b'some/branch' - }) - - @istest - def ref_to_occurrence_2(self): - # when - actual_occ = converters.ref_to_occurrence({ - 'id': 'some-id', - 'branch': b'some/branch' - }) - - # then - self.assertEquals(actual_occ, { - 'id': 'some-id', - 'branch': b'some/branch' - })