diff --git a/swh/loader/core/__init__.py b/swh/loader/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/loader/core/converters.py b/swh/loader/core/converters.py new file mode 100644 index 0000000..33572f2 --- /dev/null +++ b/swh/loader/core/converters.py @@ -0,0 +1,180 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Convert dir objects to dictionaries suitable for swh.storage""" + +import datetime +import os + +from swh.model.hashutil import hash_to_hex + +from swh.model import git + + +def to_datetime(ts): + """Convert a timestamp to utc datetime. + + """ + return datetime.datetime.utcfromtimestamp(ts).replace( + tzinfo=datetime.timezone.utc) + + +def format_to_minutes(offset_str): + """Convert a git string timezone format string (e.g +0200, -0310) to minutes. + + Args: + offset_str: a string representing an offset. + + Returns: + A positive or negative number of minutes of such input + + """ + sign = offset_str[0] + hours = int(offset_str[1:3]) + minutes = int(offset_str[3:]) + (hours * 60) + return minutes if sign == '+' else -1 * minutes + + +def blob_to_content(obj, log=None, max_content_size=None, + origin_id=None): + """Convert obj to a swh storage content. + + Note: + - If obj represents a link, the length and data are already + provided so we use them directly. + - 'data' is returned only if max_content_size is not reached. + + Returns: + obj converted to content as a dictionary. + + """ + filepath = obj['path'] + if 'length' in obj: # link already has it + size = obj['length'] + else: + size = os.lstat(filepath).st_size + + ret = { + 'sha1': obj['sha1'], + 'sha256': obj['sha256'], + 'sha1_git': obj['sha1_git'], + 'length': size, + 'perms': obj['perms'].value, + 'type': obj['type'].value, + } + + if max_content_size and size > max_content_size: + if log: + log.info('Skipping content %s, too large (%s > %s)' % + (hash_to_hex(obj['sha1_git']), + size, + max_content_size)) + ret.update({'status': 'absent', + 'reason': 'Content too large', + 'origin': origin_id}) + return ret + + if 'data' in obj: # link already has it + data = obj['data'] + else: + data = open(filepath, 'rb').read() + + ret.update({ + 'data': data, + 'status': 'visible' + }) + + return ret + + +# Map of type to swh types +_entry_type_map = { + git.GitType.TREE: 'dir', + git.GitType.BLOB: 'file', + git.GitType.COMM: 'rev', +} + + +def tree_to_directory(tree, objects, log=None): + """Format a tree as a directory + + """ + entries = [] + for entry in objects[tree['path']]: + entries.append({ + 'type': _entry_type_map[entry['type']], + 'perms': int(entry['perms'].value), + 'name': entry['name'], + 'target': entry['sha1_git'] + }) + + return { + 'id': tree['sha1_git'], + 'entries': entries + } + + +def commit_to_revision(commit, objects, log=None): + """Format a commit as a revision. + + """ + upper_directory = objects[git.ROOT_TREE_KEY][0] + return { + 'date': { + 'timestamp': commit['author_date'], + 'offset': format_to_minutes(commit['author_offset']), + }, + 'committer_date': { + 'timestamp': commit['committer_date'], + 'offset': format_to_minutes(commit['committer_offset']), + }, + 'type': commit['type'], + 'directory': upper_directory['sha1_git'], + 'message': commit['message'].encode('utf-8'), + 'author': { + 'name': commit['author_name'].encode('utf-8'), + 'email': commit['author_email'].encode('utf-8'), + }, + 'committer': { + 'name': commit['committer_name'].encode('utf-8'), + 'email': commit['committer_email'].encode('utf-8'), + }, + 'synthetic': True, + 'metadata': commit['metadata'], + 'parents': [], + } + + +def annotated_tag_to_release(release, log=None): + """Format a swh release. + + """ + return { + 'target': release['target'], + 'target_type': release['target_type'], + 'name': release['name'].encode('utf-8'), + 'message': release['comment'].encode('utf-8'), + 'date': { + 'timestamp': release['date'], + 'offset': format_to_minutes(release['offset']), + }, + 'author': { + 'name': release['author_name'].encode('utf-8'), + 'email': release['author_email'].encode('utf-8'), + }, + 'synthetic': True, + } + + +def ref_to_occurrence(ref): + """Format a reference as an occurrence""" + occ = ref.copy() + if 'branch' in ref: + branch = ref['branch'] + if isinstance(branch, str): + occ['branch'] = branch.encode('utf-8') + else: + occ['branch'] = branch + return occ diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py new file mode 100644 index 0000000..9f241c6 --- /dev/null +++ b/swh/loader/core/loader.py @@ -0,0 +1,426 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import psycopg2 +import requests +import traceback +import uuid + +from retrying import retry + +from swh.core import config + +from swh.loader.dir import converters +from swh.model.git import GitType +from swh.storage import get_storage + +from swh.loader.core.queue import QueuePerSizeAndNbUniqueElements +from swh.loader.core.queue import QueuePerNbUniqueElements +from swh.loader.core.queue import QueuePerNbElements + + +def retry_loading(error): + """Retry policy when the database raises an integrity error""" + exception_classes = [ + # raised when two parallel insertions insert the same data. + psycopg2.IntegrityError, + # raised when uWSGI restarts and hungs up on the worker. + requests.exceptions.ConnectionError, + ] + + if not any(isinstance(error, exc) for exc in exception_classes): + return False + + logger = logging.getLogger('swh.loader') + + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry loading a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + + return True + + +def shallow_blob(obj): + """Convert a full swh content/blob to just what's needed by + swh-storage for filtering. + + Returns: + A shallow copy of a full swh content/blob object. + + """ + return { + 'sha1': obj['sha1'], + 'sha256': obj['sha256'], + 'sha1_git': obj['sha1_git'], + 'length': obj['length'] + } + + +def shallow_tree(tree): + """Convert a full swh directory/tree to just what's needed by + swh-storage for filtering. + + Returns: + A shallow copy of a full swh directory/tree object. + + """ + return tree['sha1_git'] + + +def shallow_commit(commit): + """Convert a full swh revision/commit to just what's needed by + swh-storage for filtering. + + Returns: + A shallow copy of a full swh revision/commit object. + + """ + return commit['id'] + + +def shallow_tag(tag): + """Convert a full swh release/tag to just what's needed by + swh-storage for filtering. + + Returns: + A shallow copy of a full swh release/tag object. + + """ + return tag['id'] + + +class SWHLoader(config.SWHConfig): + """A svn loader. + + This will load the svn repository. + + """ + def __init__(self, config, revision_type, origin_id, logging_class): + self.config = config + + self.origin_id = origin_id + self.storage = get_storage(config['storage_class'], + config['storage_args']) + self.revision_type = revision_type + + self.log = logging.getLogger(logging_class) + + self.contents = QueuePerSizeAndNbUniqueElements( + key='sha1', + max_nb_elements=self.config['content_packet_size'], + max_size=self.config['content_packet_block_size_bytes']) + + self.contents_seen = set() + + self.directories = QueuePerNbUniqueElements( + key='id', + max_nb_elements=self.config['directory_packet_size']) + + self.directories_seen = set() + + self.revisions = QueuePerNbUniqueElements( + key='id', + max_nb_elements=self.config['revision_packet_size']) + + self.revisions_seen = set() + + self.releases = QueuePerNbUniqueElements( + key='id', + max_nb_elements=self.config['release_packet_size']) + + self.releases_seen = set() + + self.occurrences = QueuePerNbElements( + self.config['occurrence_packet_size']) + + l = logging.getLogger('requests.packages.urllib3.connectionpool') + l.setLevel(logging.WARN) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_contents(self, content_list): + """Actually send properly formatted contents to the database""" + num_contents = len(content_list) + if num_contents > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + self.storage.content_add(content_list) + self.log.debug("Done sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_directories(self, directory_list): + """Actually send properly formatted directories to the database""" + num_directories = len(directory_list) + if num_directories > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + self.storage.directory_add(directory_list) + self.log.debug("Done sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_revisions(self, revision_list): + """Actually send properly formatted revisions to the database""" + num_revisions = len(revision_list) + if num_revisions > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + self.storage.revision_add(revision_list) + self.log.debug("Done sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_releases(self, release_list): + """Actually send properly formatted releases to the database""" + num_releases = len(release_list) + if num_releases > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + self.storage.release_add(release_list) + self.log.debug("Done sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_occurrences(self, occurrence_list): + """Actually send properly formatted occurrences to the database""" + num_occurrences = len(occurrence_list) + if num_occurrences > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + self.storage.occurrence_add(occurrence_list) + self.log.debug("Done sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + + def filter_missing_blobs(self, blobs): + """Filter missing blob from swh. + + """ + max_content_size = self.config['content_packet_size_bytes'] + blobs_per_sha1 = {} + shallow_blobs = [] + for key, blob in ((b['sha1'], b) for b in blobs + if b['sha1'] not in self.contents_seen): + blobs_per_sha1[key] = blob + shallow_blobs.append(shallow_blob(blob)) + self.contents_seen.add(key) + + for sha1 in self.storage.content_missing(shallow_blobs, + key_hash='sha1'): + yield converters.blob_to_content(blobs_per_sha1[sha1], + max_content_size=max_content_size, + origin_id=self.origin_id) + + def bulk_send_blobs(self, blobs): + """Format blobs as swh contents and send them to the database""" + threshold_reached = self.contents.add( + self.filter_missing_blobs(blobs)) + if threshold_reached: + self.send_contents(self.contents.pop()) + + def filter_missing_trees(self, trees, objects): + """Filter missing tree from swh. + + """ + trees_per_sha1 = {} + shallow_trees = [] + for key, tree in ((t['sha1_git'], t) for t in trees + if t['sha1_git'] not in self.directories_seen): + trees_per_sha1[key] = tree + shallow_trees.append(shallow_tree(tree)) + self.directories_seen.add(key) + + for sha in self.storage.directory_missing(shallow_trees): + yield converters.tree_to_directory(trees_per_sha1[sha], objects) + + def bulk_send_trees(self, objects, trees): + """Format trees as swh directories and send them to the database""" + threshold_reached = self.directories.add( + self.filter_missing_trees(trees, objects)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + + def filter_missing_commits(self, commits): + """Filter missing commit from swh. + + """ + commits_per_sha1 = {} + shallow_commits = [] + for key, commit in ((c['id'], c) for c in commits + if c['id'] not in self.revisions_seen): + commits_per_sha1[key] = commit + shallow_commits.append(shallow_commit(commit)) + self.revisions_seen.add(key) + + for sha in self.storage.revision_missing(shallow_commits, + type=self.revision_type): + yield commits_per_sha1[sha] + + def bulk_send_commits(self, commits): + """Format commits as swh revisions and send them to the database. + + """ + threshold_reached = self.revisions.add( + self.filter_missing_commits(commits)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + self.send_revisions(self.revisions.pop()) + + def filter_missing_tags(self, tags): + """Filter missing tags from swh. + + """ + tags_per_sha1 = {} + shallow_tags = [] + for key, tag in ((t['id'], t) for t in tags + if t['id'] not in self.releases_seen): + tags_per_sha1[key] = tag + shallow_tags.append(shallow_tag(tag)) + self.releases_seen.add(key) + + for sha in self.storage.release_missing(shallow_tags, + type=self.revision_type): + yield tags_per_sha1[sha] + + def bulk_send_annotated_tags(self, tags): + """Format annotated tags (pygit2.Tag objects) as swh releases and send + them to the database. + + """ + threshold_reached = self.releases.add( + self.filter_missing_tags(tags)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + self.send_revisions(self.revisions.pop()) + self.send_releases(self.releases.pop()) + + def bulk_send_refs(self, refs): + """Format git references as swh occurrences and send them to the + database. + + """ + threshold_reached = self.occurrences.add( + (converters.ref_to_occurrence(r) for r in refs)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + self.send_revisions(self.revisions.pop()) + self.send_releases(self.releases.pop()) + self.send_occurrences(self.occurrences.pop()) + + def maybe_load_contents(self, contents): + if self.config['send_contents']: + self.bulk_send_blobs(contents) + else: + self.log.info('Not sending contents') + + def maybe_load_directories(self, trees, objects_per_path): + if self.config['send_directories']: + self.bulk_send_trees(objects_per_path, trees) + else: + self.log.info('Not sending directories') + + def maybe_load_revisions(self, revisions): + if self.config['send_revisions']: + self.bulk_send_commits(revisions) + else: + self.log.info('Not sending revisions') + + def maybe_load_releases(self, releases): + if self.config['send_releases']: + self.bulk_send_annotated_tags(releases) + else: + self.log.info('Not sending releases') + + def maybe_load_occurrences(self, occurrences): + if self.config['send_occurrences']: + self.bulk_send_refs(occurrences) + else: + self.log.info('Not sending occurrences') + + def load(self, objects_per_type, objects_per_path): + self.maybe_load_contents(objects_per_type[GitType.BLOB]) + self.maybe_load_directories(objects_per_type[GitType.TREE], + objects_per_path) + self.maybe_load_revisions(objects_per_type[GitType.COMM]) + self.maybe_load_releases(objects_per_type[GitType.RELE]) + self.maybe_load_occurrences(objects_per_type[GitType.REFS]) + + def flush(self): + if self.config['send_contents']: + self.send_contents(self.contents.pop()) + if self.config['send_directories']: + self.send_directories(self.directories.pop()) + if self.config['send_revisions']: + self.send_revisions(self.revisions.pop()) + if self.config['send_occurrences']: + self.send_occurrences(self.occurrences.pop()) + if self.config['send_releases']: + self.send_releases(self.releases.pop()) diff --git a/swh/loader/core/queue.py b/swh/loader/core/queue.py new file mode 100644 index 0000000..f32e354 --- /dev/null +++ b/swh/loader/core/queue.py @@ -0,0 +1,101 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class QueuePerNbElements(): + """Basic queue which holds the nb of elements it contains. + + """ + def __init__(self, max_nb_elements): + self.reset() + self.max_nb_elements = max_nb_elements + + def add(self, elements): + if not isinstance(elements, list): + elements = list(elements) + self.elements.extend(elements) + self.count += len(elements) + return self.count >= self.max_nb_elements + + def pop(self): + elements = self.elements + self.reset() + return elements + + def reset(self): + self.elements = [] + self.count = 0 + + +class QueuePerSizeAndNbUniqueElements(): + """Queue which permits to add unknown elements and holds the current + size of the queue. + + """ + def __init__(self, max_nb_elements, max_size, key): + self.reset() + self.max_nb_elements = max_nb_elements + self.max_size = max_size + self.key = key + self.keys = set() + + def _add_element(self, e): + k = e[self.key] + if k not in self.keys: + self.keys.add(k) + self.elements.append(e) + self.size += e['length'] + self.count += 1 + + def add(self, elements): + for e in elements: + self._add_element(e) + return self.size >= self.max_size or \ + self.count >= self.max_nb_elements + + def pop(self): + elements = self.elements + self.reset() + return elements + + def reset(self): + self.elements = [] + self.keys = set() + self.size = 0 + self.count = 0 + + +class QueuePerNbUniqueElements(): + """Queue which permits to add unknown elements and knows the actual + count of elements it held. + + """ + def __init__(self, max_nb_elements, key): + self.reset() + self.max_nb_elements = max_nb_elements + self.key = key + self.keys = set() + + def _add_element(self, e): + k = e[self.key] + if k not in self.keys: + self.keys.add(k) + self.elements.append(e) + self.count += 1 + + def add(self, elements): + for e in elements: + self._add_element(e) + return self.count >= self.max_nb_elements + + def pop(self): + elements = self.elements + self.reset() + return elements + + def reset(self): + self.elements = [] + self.keys = set() + self.count = 0 diff --git a/swh/loader/core/tests/test_converters.py b/swh/loader/core/tests/test_converters.py new file mode 100644 index 0000000..0643953 --- /dev/null +++ b/swh/loader/core/tests/test_converters.py @@ -0,0 +1,337 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import shutil +import tempfile +import unittest + +from nose.tools import istest + +from swh.loader.dir import converters +from swh.model import git + + +def tmpfile_with_content(fromdir, contentfile): + """Create a temporary file with content contentfile in directory fromdir. + + """ + tmpfilepath = tempfile.mktemp( + suffix='.swh', + prefix='tmp-file-for-test', + dir=fromdir) + + with open(tmpfilepath, 'wb') as f: + f.write(contentfile) + + return tmpfilepath + + +class TestConverters(unittest.TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.tmpdir = tempfile.mkdtemp(prefix='test-swh-loader-dir.') + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmpdir) + super().tearDownClass() + + @istest + def format_to_minutes(self): + self.assertEquals(converters.format_to_minutes('+0100'), 60) + self.assertEquals(converters.format_to_minutes('-0200'), -120) + self.assertEquals(converters.format_to_minutes('+1250'), 12*60+50) + self.assertEquals(converters.format_to_minutes('+0000'), 0) + self.assertEquals(converters.format_to_minutes('-0000'), 0) + + @istest + def annotated_tag_to_release(self): + # given + release = { + 'id': '123', + 'target': '456', + 'target_type': 'revision', + 'name': 'some-release', + 'comment': 'some-comment-on-release', + 'date': 1444054085, + 'offset': '-0300', + 'author_name': 'someone', + 'author_email': 'someone@whatelse.eu', + } + + expected_release = { + 'target': '456', + 'target_type': 'revision', + 'name': b'some-release', + 'message': b'some-comment-on-release', + 'date': { + 'timestamp': 1444054085, + 'offset': -180 + }, + 'author': { + 'name': b'someone', + 'email': b'someone@whatelse.eu', + }, + 'synthetic': True, + } + + # when + actual_release = converters.annotated_tag_to_release(release) + + # then + self.assertDictEqual(actual_release, expected_release) + + @istest + def blob_to_content_visible_data(self): + # given + contentfile = b'temp file for testing blob to content conversion' + tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) + + obj = { + 'path': tmpfilepath, + 'perms': git.GitPerm.BLOB, + 'type': git.GitType.BLOB, + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + } + + expected_blob = { + 'data': contentfile, + 'length': len(contentfile), + 'status': 'visible', + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + 'perms': git.GitPerm.BLOB.value, + 'type': git.GitType.BLOB.value, + } + + # when + actual_blob = converters.blob_to_content(obj) + + # then + self.assertEqual(actual_blob, expected_blob) + + @istest + def blob_to_content_link(self): + # given + contentfile = b'temp file for testing blob to content conversion' + tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) + tmplinkpath = tempfile.mktemp(dir=self.tmpdir) + os.symlink(tmpfilepath, tmplinkpath) + + obj = { + 'path': tmplinkpath, + 'perms': git.GitPerm.BLOB, + 'type': git.GitType.BLOB, + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + } + + expected_blob = { + 'data': contentfile, + 'length': len(tmpfilepath), + 'status': 'visible', + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + 'perms': git.GitPerm.BLOB.value, + 'type': git.GitType.BLOB.value, + } + + # when + actual_blob = converters.blob_to_content(obj) + + # then + self.assertEqual(actual_blob, expected_blob) + + @istest + def blob_to_content_link_with_data_length_populated(self): + # given + tmplinkpath = tempfile.mktemp(dir=self.tmpdir) + obj = { + 'length': 10, # wrong for test purposes + 'data': 'something wrong', # again for test purposes + 'path': tmplinkpath, + 'perms': git.GitPerm.BLOB, + 'type': git.GitType.BLOB, + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + } + + expected_blob = { + 'length': 10, + 'data': 'something wrong', + 'status': 'visible', + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + 'perms': git.GitPerm.BLOB.value, + 'type': git.GitType.BLOB.value, + } + + # when + actual_blob = converters.blob_to_content(obj) + + # then + self.assertEqual(actual_blob, expected_blob) + + @istest + def blob_to_content2_absent_data(self): + # given + contentfile = b'temp file for testing blob to content conversion' + tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) + + obj = { + 'path': tmpfilepath, + 'perms': git.GitPerm.BLOB, + 'type': git.GitType.BLOB, + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + } + + expected_blob = { + 'length': len(contentfile), + 'status': 'absent', + 'sha1': 'some-sha1', + 'sha256': 'some-sha256', + 'sha1_git': 'some-sha1git', + 'perms': git.GitPerm.BLOB.value, + 'type': git.GitType.BLOB.value, + 'reason': 'Content too large', + 'origin': 190 + } + + # when + actual_blob = converters.blob_to_content(obj, None, + max_content_size=10, + origin_id=190) + + # then + self.assertEqual(actual_blob, expected_blob) + + @istest + def tree_to_directory_no_entries(self): + # given + tree = { + 'path': 'foo', + 'sha1_git': b'tree_sha1_git' + } + objects = { + 'foo': [{'type': git.GitType.TREE, + 'perms': git.GitPerm.TREE, + 'name': 'bar', + 'sha1_git': b'sha1-target'}, + {'type': git.GitType.BLOB, + 'perms': git.GitPerm.BLOB, + 'name': 'file-foo', + 'sha1_git': b'file-foo-sha1-target'}] + } + + expected_directory = { + 'id': b'tree_sha1_git', + 'entries': [{'type': 'dir', + 'perms': int(git.GitPerm.TREE.value), + 'name': 'bar', + 'target': b'sha1-target'}, + {'type': 'file', + 'perms': int(git.GitPerm.BLOB.value), + 'name': 'file-foo', + 'target': b'file-foo-sha1-target'}] + } + + # when + actual_directory = converters.tree_to_directory(tree, objects) + + # then + self.assertEqual(actual_directory, expected_directory) + + @istest + def commit_to_revision(self): + # given + commit = { + 'sha1_git': 'commit-git-sha1', + 'author_date': 1444054085, + 'author_offset': '+0000', + 'committer_date': 1444054085, + 'committer_offset': '-0000', + 'type': 'tar', + 'message': 'synthetic-message-input', + 'author_name': 'author-name', + 'author_email': 'author-email', + 'committer_name': 'committer-name', + 'committer_email': 'committer-email', + 'metadata': {'checksums': {'sha1': b'sha1-as-bytes'}}, + 'directory': 'targeted-tree-sha1', + } + + objects = { + git.ROOT_TREE_KEY: [{'sha1_git': 'targeted-tree-sha1'}] + } + + expected_revision = { + 'date': { + 'timestamp': 1444054085, + 'offset': 0, + }, + 'committer_date': { + 'timestamp': 1444054085, + 'offset': 0, + }, + 'type': 'tar', + 'directory': 'targeted-tree-sha1', + 'message': b'synthetic-message-input', + 'author': { + 'name': b'author-name', + 'email': b'author-email', + }, + 'committer': { + 'name': b'committer-name', + 'email': b'committer-email', + }, + 'synthetic': True, + 'metadata': {'checksums': {'sha1': b'sha1-as-bytes'}}, + 'parents': [], + } + + # when + actual_revision = converters.commit_to_revision(commit, objects) + + # then + self.assertEquals(actual_revision, expected_revision) + + @istest + def ref_to_occurrence_1(self): + # when + actual_occ = converters.ref_to_occurrence({ + 'id': 'some-id', + 'branch': 'some/branch' + }) + # then + self.assertEquals(actual_occ, { + 'id': 'some-id', + 'branch': b'some/branch' + }) + + @istest + def ref_to_occurrence_2(self): + # when + actual_occ = converters.ref_to_occurrence({ + 'id': 'some-id', + 'branch': b'some/branch' + }) + + # then + self.assertEquals(actual_occ, { + 'id': 'some-id', + 'branch': b'some/branch' + }) diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py new file mode 100644 index 0000000..2c5df5b --- /dev/null +++ b/swh/loader/core/tests/test_loader.py @@ -0,0 +1,85 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from nose.tools import istest + +from swh.loader.core import loader + + +class TestLoader(unittest.TestCase): + @istest + def shallow_blob(self): + # when + actual_blob = loader.shallow_blob({ + 'length': 1451, + 'sha1_git': + b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', + 'name': b'LDPCL', + 'type': b'blob', + 'sha256': + b'\xe6it!\x99\xb37UT\x8f\x0e\x8f\xd7o\x92"\xce\xa3\x1d\xd2\xe5D>M\xaaj/\x03\x138\xad\x1b', # noqa + 'perms': b'100644', + 'sha1': + b'.\x18Y\xd6M\x8c\x9a\xa4\xe1\xf1\xc7\x95\x082\xcf\xc9\xd8\nV)', + 'path': + b'/tmp/tmp.c86tq5o9.swh.loader/pkg-doc-linux/copyrights/non-free/LDPCL' # noqa + }) + + # then + self.assertEqual(actual_blob, { + 'sha1': + b'.\x18Y\xd6M\x8c\x9a\xa4\xe1\xf1\xc7\x95\x082\xcf\xc9\xd8\nV)', + 'sha1_git': + b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', + 'sha256': + b'\xe6it!\x99\xb37UT\x8f\x0e\x8f\xd7o\x92"\xce\xa3\x1d\xd2\xe5D>M\xaaj/\x03\x138\xad\x1b', # noqa + 'length': 1451, + }) + + @istest + def shallow_tree(self): + # when + actual_shallow_tree = loader.shallow_tree({ + 'length': 1451, + 'sha1_git': + b'tree-id', + 'type': b'tree', + 'sha256': + b'\xe6it!\x99\xb37UT\x8f\x0e\x8f\xd7o\x92"\xce\xa3\x1d\xd2\xe5D>M\xaaj/\x03\x138\xad\x1b', # noqa + 'perms': b'100644', + 'sha1': + b'.\x18Y\xd6M\x8c\x9a\xa4\xe1\xf1\xc7\x95\x082\xcf\xc9\xd8\nV)', + }) + + # then + self.assertEqual(actual_shallow_tree, b'tree-id') + + @istest + def shallow_commit(self): + # when + actual_shallow_commit = loader.shallow_commit({ + 'sha1_git': + b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', + 'type': b'commit', + 'id': b'let-me-see-some-id', + }) + + # then + self.assertEqual(actual_shallow_commit, b'let-me-see-some-id') + + @istest + def shallow_tag(self): + # when + actual_shallow_tag = loader.shallow_tag({ + 'sha1': + b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', + 'type': b'tag', + 'id': b'this-is-not-the-id-you-are-looking-for', + }) + + # then + self.assertEqual(actual_shallow_tag, b'this-is-not-the-id-you-are-looking-for') # noqa diff --git a/swh/loader/core/tests/test_queue.py b/swh/loader/core/tests/test_queue.py new file mode 100644 index 0000000..c036868 --- /dev/null +++ b/swh/loader/core/tests/test_queue.py @@ -0,0 +1,141 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from nose.tools import istest + +from swh.loader.core.queue import QueuePerNbElements +from swh.loader.core.queue import QueuePerNbUniqueElements +from swh.loader.core.queue import QueuePerSizeAndNbUniqueElements + + +class TestQueuePerNbElements(unittest.TestCase): + @istest + def simple_queue_behavior(self): + max_nb_elements = 10 + queue = QueuePerNbElements(max_nb_elements=max_nb_elements) + + elements = [1, 3, 4, 9, 20, 30, 40] + actual_threshold = queue.add(elements) + + self.assertFalse(actual_threshold, len(elements) > max_nb_elements) + + # pop returns the content and reset the queue + actual_elements = queue.pop() + self.assertEquals(actual_elements, elements) + self.assertEquals(queue.pop(), []) + + # duplicates can be integrated + new_elements = [1, 1, 3, 4, 9, 20, 30, 40, 12, 14, 2] + actual_threshold = queue.add(new_elements) + + self.assertTrue(actual_threshold) + self.assertEquals(queue.pop(), new_elements) + + # reset is destructive too + queue.add(new_elements) + queue.reset() + + self.assertEquals(queue.pop(), []) + + +def to_some_objects(elements, key): + for elt in elements: + yield {key: elt} + + +class TestQueuePerNbUniqueElements(unittest.TestCase): + @istest + def queue_with_unique_key_behavior(self): + max_nb_elements = 5 + queue = QueuePerNbUniqueElements(max_nb_elements=max_nb_elements, + key='id') + + # no duplicates + elements = list(to_some_objects([1, 1, 3, 4, 9], key='id')) + actual_threshold = queue.add(elements) + + self.assertFalse(actual_threshold, len(elements) > max_nb_elements) + + # pop returns the content and reset the queue + actual_elements = queue.pop() + self.assertEquals(actual_elements, + [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}]) + self.assertEquals(queue.pop(), []) + + new_elements = list(to_some_objects( + [1, 3, 4, 9, 20], + key='id')) + actual_threshold = queue.add(new_elements) + + self.assertTrue(actual_threshold) + + # reset is destructive too + queue.add(new_elements) + queue.reset() + + self.assertEquals(queue.pop(), []) + + +def to_some_complex_objects(elements, key): + for elt, size in elements: + yield {key: elt, 'length': size} + + +class TestQueuePerSizeAndNbUniqueElements(unittest.TestCase): + @istest + def queue_with_unique_key_and_size_behavior(self): + max_nb_elements = 5 + max_size = 100 + queue = QueuePerSizeAndNbUniqueElements( + max_nb_elements=max_nb_elements, + max_size=max_size, + key='k') + + # size total exceeded, nb elements not reached, still the + # threshold is deemed reached + elements = list(to_some_complex_objects([(1, 10), + (2, 20), + (3, 30), + (4, 100)], key='k')) + actual_threshold = queue.add(elements) + + self.assertTrue(actual_threshold) + + # pop returns the content and reset the queue + actual_elements = queue.pop() + self.assertEquals(actual_elements, + [{'k': 1, 'length': 10}, + {'k': 2, 'length': 20}, + {'k': 3, 'length': 30}, + {'k': 4, 'length': 100}]) + self.assertEquals(queue.pop(), []) + + # size threshold not reached, nb elements reached, the + # threshold is considered reached + new_elements = list(to_some_complex_objects( + [(1, 10), (3, 5), (4, 2), (9, 1), (20, 0)], + key='k')) + actual_threshold = queue.add(new_elements) + + queue.reset() + + self.assertTrue(actual_threshold) + + # nb elements threshold not reached, nor the top number of + # elements, the threshold is not reached + new_elements = list(to_some_complex_objects( + [(1, 10)], + key='k')) + actual_threshold = queue.add(new_elements) + + self.assertFalse(actual_threshold) + + # reset is destructive too + queue.add(new_elements) + queue.reset() + + self.assertEquals(queue.pop(), [])