diff --git a/swh/loader/core/converters.py b/swh/loader/core/converters.py index 7e5bcfe..c172137 100644 --- a/swh/loader/core/converters.py +++ b/swh/loader/core/converters.py @@ -1,154 +1,41 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert objects to dictionaries suitable for swh.storage""" -import os - from swh.model.hashutil import hash_to_hex -from swh.model import git - -def blob_to_content(obj, log=None, max_content_size=None, - origin_id=None): - """Convert obj to a swh storage content. +def content_for_storage(content, log=None, max_content_size=None, + origin_id=None): + """Prepare content to be ready for storage Note: - - If obj represents a link, the length and data are already - provided so we use them directly. - 'data' is returned only if max_content_size is not reached. Returns: - obj converted to content as a dictionary. + content with added data (or reason for being missing) """ - filepath = obj['path'] - if 'length' in obj: # link already has it - size = obj['length'] - else: - size = os.lstat(filepath).st_size - - ret = { - 'sha1': obj['sha1'], - 'sha256': obj['sha256'], - 'sha1_git': obj['sha1_git'], - 'blake2s256': obj['blake2s256'], - 'length': size, - 'perms': obj['perms'].value, - 'type': obj['type'].value, - } + ret = content.copy() - if max_content_size and size > max_content_size: + if max_content_size and ret['length'] > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % - (hash_to_hex(obj['sha1_git']), - size, + (hash_to_hex(content['sha1_git']), + ret['length'], max_content_size)) + ret.pop('data', None) ret.update({'status': 'absent', 'reason': 'Content too large', 'origin': origin_id}) return ret - if 'data' in obj: # link already has it - data = obj['data'] - else: - data = open(filepath, 'rb').read() + if 'data' not in ret: + ret['data'] = open(ret['path'], 'rb').read() - ret.update({ - 'data': data, - 'status': 'visible' - }) + ret['status'] = 'visible' return ret - - -# Map of type to swh types -_entry_type_map = { - git.GitType.TREE: 'dir', - git.GitType.BLOB: 'file', - git.GitType.COMM: 'rev', -} - - -def tree_to_directory(tree, log=None): - """Format a tree as a directory - - """ - entries = [] - for entry in tree['children']: - entries.append({ - 'type': _entry_type_map[entry['type']], - 'perms': int(entry['perms'].value), - 'name': entry['name'], - 'target': entry['sha1_git'] - }) - - return { - 'id': tree['sha1_git'], - 'entries': entries - } - - -def ref_to_occurrence(ref): - """Format a reference as an occurrence""" - occ = ref.copy() - if 'branch' in ref: - branch = ref['branch'] - if isinstance(branch, str): - occ['branch'] = branch.encode('utf-8') - else: - occ['branch'] = branch - return occ - - -def shallow_blob(obj): - """Convert a full swh content/blob to just what's needed by - swh-storage for filtering. - - Returns: - A shallow copy of a full swh content/blob object. - - """ - return { - 'sha1': obj['sha1'], - 'sha256': obj['sha256'], - 'sha1_git': obj['sha1_git'], - 'blake2s256': obj['blake2s256'], - 'length': obj['length'] - } - - -def shallow_tree(tree): - """Convert a full swh directory/tree to just what's needed by - swh-storage for filtering. - - Returns: - A shallow copy of a full swh directory/tree object. - - """ - return tree['sha1_git'] - - -def shallow_commit(commit): - """Convert a full swh revision/commit to just what's needed by - swh-storage for filtering. - - Returns: - A shallow copy of a full swh revision/commit object. - - """ - return commit['id'] - - -def shallow_tag(tag): - """Convert a full swh release/tag to just what's needed by - swh-storage for filtering. - - Returns: - A shallow copy of a full swh release/tag object. - - """ - return tag['id'] diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index a1438c9..46b9131 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,640 +1,639 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import psycopg2 import requests import traceback import uuid from abc import ABCMeta, abstractmethod from retrying import retry from . import converters from swh.core import config from swh.storage import get_storage from .queue import QueuePerSizeAndNbUniqueElements from .queue import QueuePerNbUniqueElements from .queue import QueuePerNbElements def retry_loading(error): """Retry policy when the database raises an integrity error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class SWHLoader(config.SWHConfig, metaclass=ABCMeta): """Mixin base class for loader. To use this class, you must: - inherit from this class - and implement the @abstractmethod methods :func:`cleanup`: Last step executed by the loader. :func:`prepare`: First step executed by the loader to prepare some state needed by the `func`:load method. :func:`get_origin`: Retrieve the origin that is currently being loaded. :func:`fetch_data`: Fetch the data is actually the method to implement to compute data to inject in swh (through the store_data method) :func:`store_data`: Store data fetched. You can take a look at some example classes: :class:BaseSvnLoader :class:TarLoader :class:DirLoader """ CONFIG_BASE_FILENAME = None DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), # Number of contents 'content_packet_size': ('int', 10000), # If this size threshold is reached, the content is condidered missing # in swh-storage 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), # packet of 100Mib contents 'content_packet_block_size_bytes': ('int', 100 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } ADDITIONAL_CONFIG = {} def __init__(self, logging_class, config=None): if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.storage = get_storage(**self.config['storage']) self.log = logging.getLogger(logging_class) self.contents = QueuePerSizeAndNbUniqueElements( key='sha1', max_nb_elements=self.config['content_packet_size'], max_size=self.config['content_packet_block_size_bytes']) self.contents_seen = set() self.directories = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['directory_packet_size']) self.directories_seen = set() self.revisions = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['revision_packet_size']) self.revisions_seen = set() self.releases = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['release_packet_size']) self.releases_seen = set() self.occurrences = QueuePerNbElements( self.config['occurrence_packet_size']) l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.counters = { 'contents': 0, 'directories': 0, 'revisions': 0, 'releases': 0, 'occurrences': 0, } @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin(self, origin): log_id = str(uuid.uuid4()) self.log.debug('Creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin_id = self.storage.origin_add_one(origin) self.log.debug('Done creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin_id @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_origin_visit(self, origin_id, visit_date): log_id = str(uuid.uuid4()) self.log.debug( 'Creating origin_visit for origin %s at time %s' % ( origin_id, visit_date), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.log.debug( 'Done Creating origin_visit for origin %s at time %s' % ( origin_id, visit_date), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) return origin_visit @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def update_origin_visit(self, origin_id, visit, status): log_id = str(uuid.uuid4()) self.log.debug( 'Updating origin_visit for origin %s with status %s' % ( origin_id, status), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) self.storage.origin_visit_update(origin_id, visit, status) self.log.debug( 'Done updating origin_visit for origin %s with status %s' % ( origin_id, status), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin_visit', 'swh_num': 1, 'swh_id': log_id }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database. """ num_contents = len(content_list) if num_contents > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.counters['contents'] += num_contents self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database. """ num_directories = len(directory_list) if num_directories > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.counters['directories'] += num_directories self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database. """ num_revisions = len(revision_list) if num_revisions > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.counters['revisions'] += num_revisions self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database. """ num_releases = len(release_list) if num_releases > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.counters['releases'] += num_releases self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database. """ num_occurrences = len(occurrence_list) if num_occurrences > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.counters['occurrences'] += num_occurrences self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) - def filter_missing_blobs(self, blobs): - """Filter missing blobs from swh. - - """ + def filter_missing_contents(self, contents): + """Return only the contents missing from swh""" max_content_size = self.config['content_packet_size_bytes'] - blobs_per_sha1 = {} - shallow_blobs = [] - for key, blob in ((b['sha1'], b) for b in blobs - if b['sha1'] not in self.contents_seen): - blobs_per_sha1[key] = blob - shallow_blobs.append(converters.shallow_blob(blob)) + contents_per_key = {} + content_key = 'blake2s256' + + for content in contents: + if content[content_key] in self.contents_seen: + continue + key = content[content_key] + contents_per_key[key] = content self.contents_seen.add(key) - for sha1 in self.storage.content_missing(shallow_blobs, - key_hash='sha1'): - yield converters.blob_to_content(blobs_per_sha1[sha1], - max_content_size=max_content_size, - origin_id=self.origin_id) + for key in self.storage.content_missing( + list(contents_per_key.values()), + key_hash=content_key + ): + yield converters.content_for_storage( + contents_per_key[key], + max_content_size=max_content_size, + origin_id=self.origin_id, + ) - def bulk_send_blobs(self, blobs): - """Format blobs as swh contents and send them to the database. + def bulk_send_contents(self, contents): + """Format contents as swh contents and send them to the database. """ threshold_reached = self.contents.add( - self.filter_missing_blobs(blobs)) + self.filter_missing_contents(contents)) if threshold_reached: self.send_contents(self.contents.pop()) - def filter_missing_trees(self, trees): - """Filter missing tree from swh. + def filter_missing_directories(self, directories): + """Return only directories missing from swh""" - """ - trees_per_sha1 = {} - shallow_trees = [] - for key, tree in ((t['sha1_git'], t) for t in trees - if t['sha1_git'] not in self.directories_seen): - trees_per_sha1[key] = tree - shallow_trees.append(converters.shallow_tree(tree)) - self.directories_seen.add(key) + directories_per_id = {} + search_dirs = [] - for sha in self.storage.directory_missing(shallow_trees): - t = trees_per_sha1[sha] - yield converters.tree_to_directory(t) + for directory in directories: + dir_id = directory['id'] + if dir_id in self.directories_seen: + continue - def bulk_send_trees(self, trees): - """Format trees as swh directories and send them to the database. + search_dirs.append(dir_id) + directories_per_id[dir_id] = directory + self.directories_seen.add(dir_id) - """ + for dir_id in self.storage.directory_missing(search_dirs): + yield directories_per_id[dir_id] + + def bulk_send_directories(self, directories): + """Send missing directories to the database""" threshold_reached = self.directories.add( - self.filter_missing_trees(trees)) + self.filter_missing_directories(directories)) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) - def filter_missing_commits(self, commits): - """Filter missing commit from swh. + def filter_missing_revisions(self, revisions): + """Return only revisions missing from swh""" + revisions_per_id = {} + search_revs = [] - """ - commits_per_sha1 = {} - shallow_commits = [] - for key, commit in ((c['id'], c) for c in commits - if c['id'] not in self.revisions_seen): - commits_per_sha1[key] = commit - shallow_commits.append(converters.shallow_commit(commit)) - self.revisions_seen.add(key) + for revision in revisions: + rev_id = revision['id'] + if rev_id in self.revisions_seen: + continue - for sha in self.storage.revision_missing(shallow_commits): - yield commits_per_sha1[sha] + search_revs.append(rev_id) + revisions_per_id[rev_id] = revision + self.revisions_seen.add(rev_id) - def bulk_send_commits(self, commits): - """Format commits as swh revisions and send them to the database. + for rev_id in self.storage.revision_missing(search_revs): + yield revisions_per_id[rev_id] - """ + def bulk_send_revisions(self, revisions): + """Send missing revisions to the database""" threshold_reached = self.revisions.add( - self.filter_missing_commits(commits)) + self.filter_missing_revisions(revisions)) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) self.send_revisions(self.revisions.pop()) - def filter_missing_tags(self, tags): - """Filter missing tags from swh. + def filter_missing_releases(self, releases): + """Return only releases missing from swh""" + releases_per_id = {} + search_rels = [] - """ - tags_per_sha1 = {} - shallow_tags = [] - for key, tag in ((t['id'], t) for t in tags - if t['id'] not in self.releases_seen): - tags_per_sha1[key] = tag - shallow_tags.append(converters.shallow_tag(tag)) - self.releases_seen.add(key) + for release in releases: + rel_id = release['id'] + if rel_id in self.releases_seen: + continue - for sha in self.storage.release_missing(shallow_tags): - yield tags_per_sha1[sha] + search_rels.append(rel_id) + releases_per_id[rel_id] = release + self.releases_seen.add(rel_id) - def bulk_send_annotated_tags(self, tags): - """Format annotated tags (pygit2.Tag objects) as swh releases and send - them to the database. + for rel_id in self.storage.release_missing(search_rels): + yield releases_per_id[rel_id] - """ + def bulk_send_releases(self, releases): + """Send missing releases to the database""" threshold_reached = self.releases.add( - self.filter_missing_tags(tags)) + self.filter_missing_releases(releases)) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) self.send_revisions(self.revisions.pop()) self.send_releases(self.releases.pop()) - def bulk_send_refs(self, refs): - """Format git references as swh occurrences and send them to the - database. - - """ - threshold_reached = self.occurrences.add( - (converters.ref_to_occurrence(r) for r in refs)) + def bulk_send_occurrences(self, occurrences): + """Send the occurrences to the SWH archive""" + threshold_reached = self.occurrences.add(occurrences) if threshold_reached: self.send_contents(self.contents.pop()) self.send_directories(self.directories.pop()) self.send_revisions(self.revisions.pop()) self.send_releases(self.releases.pop()) self.send_occurrences(self.occurrences.pop()) def maybe_load_contents(self, contents): """Load contents in swh-storage if need be. """ if self.config['send_contents']: - self.bulk_send_blobs(contents) + self.bulk_send_contents(contents) - def maybe_load_directories(self, trees): + def maybe_load_directories(self, directories): """Load directories in swh-storage if need be. """ if self.config['send_directories']: - self.bulk_send_trees(trees) + self.bulk_send_directories(directories) def maybe_load_revisions(self, revisions): """Load revisions in swh-storage if need be. """ if self.config['send_revisions']: - self.bulk_send_commits(revisions) + self.bulk_send_revisions(revisions) def maybe_load_releases(self, releases): """Load releases in swh-storage if need be. """ if self.config['send_releases']: - self.bulk_send_annotated_tags(releases) + self.bulk_send_releases(releases) def maybe_load_occurrences(self, occurrences): """Load occurrences in swh-storage if need be. """ if self.config['send_occurrences']: - self.bulk_send_refs(occurrences) + self.bulk_send_occurrences(occurrences) def open_fetch_history(self): return self.storage.fetch_history_start(self.origin_id) def close_fetch_history_success(self, fetch_history_id): data = { 'status': True, 'result': self.counters, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } if self.counters['contents'] > 0 or \ self.counters['directories'] or \ self.counters['revisions'] > 0 or \ self.counters['releases'] > 0 or \ self.counters['occurrences'] > 0: data['result'] = self.counters return self.storage.fetch_history_end(fetch_history_id, data) def flush(self): """Flush any potential dangling data not sent to swh-storage. """ contents = self.contents.pop() directories = self.directories.pop() revisions = self.revisions.pop() occurrences = self.occurrences.pop() releases = self.releases.pop() # and send those to storage if asked if self.config['send_contents']: self.send_contents(contents) if self.config['send_directories']: self.send_directories(directories) if self.config['send_revisions']: self.send_revisions(revisions) if self.config['send_occurrences']: self.send_occurrences(occurrences) if self.config['send_releases']: self.send_releases(releases) @abstractmethod def cleanup(self): """Last step executed by the loader. """ pass @abstractmethod def prepare(self, *args, **kwargs): """First step executed by the loader to prepare some state needed by the loader. """ pass @abstractmethod def get_origin(self): """Get the origin that is currently being loaded. """ pass @abstractmethod def fetch_data(self): """Fetch the data we want to store. """ pass @abstractmethod def store_data(self): """Store the data we actually fetched. """ pass def load(self, *args, **kwargs): """Loading logic for the loader to follow: 1. def prepare(\*args, \**kwargs): Prepare any eventual state 2. def get_origin(): Get the origin we work with and store 3. def fetch_data(): Fetch the data to store 4. def store_data(): Store the data 5. def cleanup(): Clean up any eventual state put in place in prepare method. """ self.prepare(*args, **kwargs) origin = self.get_origin() self.origin_id = self.send_origin(origin) fetch_history_id = self.open_fetch_history() if self.visit_date: # overwriting the visit_date if provided visit_date = self.visit_date else: visit_date = datetime.datetime.now(tz=datetime.timezone.utc) origin_visit = self.send_origin_visit( self.origin_id, visit_date) self.visit = origin_visit['visit'] try: self.fetch_data() self.store_data() self.close_fetch_history_success(fetch_history_id) self.update_origin_visit( self.origin_id, self.visit, status='full') except Exception: self.log.exception('Loading failure, updating to `partial` status') self.close_fetch_history_failure(fetch_history_id) self.update_origin_visit( self.origin_id, self.visit, status='partial') finally: self.flush() self.cleanup() diff --git a/swh/loader/core/tests/test_converters.py b/swh/loader/core/tests/test_converters.py index 8035e14..b719f32 100644 --- a/swh/loader/core/tests/test_converters.py +++ b/swh/loader/core/tests/test_converters.py @@ -1,319 +1,104 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -import shutil import tempfile import unittest +from unittest.mock import Mock from nose.tools import istest +from swh.model.from_disk import Content from swh.loader.core import converters -from swh.model import git def tmpfile_with_content(fromdir, contentfile): """Create a temporary file with content contentfile in directory fromdir. """ tmpfilepath = tempfile.mktemp( suffix='.swh', prefix='tmp-file-for-test', dir=fromdir) with open(tmpfilepath, 'wb') as f: f.write(contentfile) return tmpfilepath -class TestConverters(unittest.TestCase): +class TestContentForStorage(unittest.TestCase): + maxDiff = None - @classmethod - def setUpClass(cls): + def setUp(self): super().setUpClass() - cls.tmpdir = tempfile.mkdtemp(prefix='test-swh-loader-dir.') + self.tmpdir = tempfile.TemporaryDirectory( + prefix='test-swh-loader-core.' + ) - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmpdir) - super().tearDownClass() + def tearDown(self): + self.tmpdir.cleanup() @istest - def blob_to_content_visible_data(self): + def content_for_storage_path(self): # given - contentfile = b'temp file for testing blob to content conversion' - tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) + data = b'temp file for testing content storage conversion' + tmpfile = tmpfile_with_content(self.tmpdir.name, data) - obj = { - 'path': tmpfilepath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'blake2s256': 'some-blak2s256', - 'sha1_git': 'some-sha1git', - } + obj = Content.from_file(path=os.fsdecode(tmpfile), + save_path=True).get_data() - expected_blob = { - 'data': contentfile, - 'length': len(contentfile), - 'status': 'visible', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'blake2s256': 'some-blak2s256', - 'sha1_git': 'some-sha1git', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - } + expected_content = obj.copy() + expected_content['data'] = data + expected_content['status'] = 'visible' # when - actual_blob = converters.blob_to_content(obj) + content = converters.content_for_storage(obj) # then - self.assertEqual(actual_blob, expected_blob) + self.assertEqual(content, expected_content) @istest - def blob_to_content_link(self): + def content_for_storage_data(self): # given - contentfile = b'temp file for testing blob to content conversion' - tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) - tmplinkpath = tempfile.mktemp(dir=self.tmpdir) - os.symlink(tmpfilepath, tmplinkpath) + data = b'temp file for testing content storage conversion' - obj = { - 'path': tmplinkpath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'blake2s256': 'some-blak2s256', - } + obj = Content.from_bytes(data=data, mode=0o100644).get_data() - expected_blob = { - 'data': contentfile, - 'length': len(tmpfilepath), - 'status': 'visible', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'blake2s256': 'some-blak2s256', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - } + expected_content = obj.copy() + expected_content['status'] = 'visible' # when - actual_blob = converters.blob_to_content(obj) + content = converters.content_for_storage(obj) # then - self.assertEqual(actual_blob, expected_blob) + self.assertEqual(content, expected_content) @istest - def blob_to_content_link_with_data_length_populated(self): + def content_for_storage_too_long(self): # given - tmplinkpath = tempfile.mktemp(dir=self.tmpdir) - obj = { - 'length': 10, # wrong for test purposes - 'data': 'something wrong', # again for test purposes - 'path': tmplinkpath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'blake2s256': 'some-blak2s256', - } + data = b'temp file for testing content storage conversion' - expected_blob = { - 'length': 10, - 'data': 'something wrong', - 'status': 'visible', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'blake2s256': 'some-blak2s256', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - } + obj = Content.from_bytes(data=data, mode=0o100644).get_data() - # when - actual_blob = converters.blob_to_content(obj) - - # then - self.assertEqual(actual_blob, expected_blob) - - @istest - def blob_to_content2_absent_data(self): - # given - contentfile = b'temp file for testing blob to content conversion' - tmpfilepath = tmpfile_with_content(self.tmpdir, contentfile) - - obj = { - 'path': tmpfilepath, - 'perms': git.GitPerm.BLOB, - 'type': git.GitType.BLOB, - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'blake2s256': 'some-blak2s256', - } - - expected_blob = { - 'length': len(contentfile), - 'status': 'absent', - 'sha1': 'some-sha1', - 'sha256': 'some-sha256', - 'sha1_git': 'some-sha1git', - 'blake2s256': 'some-blak2s256', - 'perms': git.GitPerm.BLOB.value, - 'type': git.GitType.BLOB.value, - 'reason': 'Content too large', - 'origin': 190 - } - - # when - actual_blob = converters.blob_to_content(obj, None, - max_content_size=10, - origin_id=190) - - # then - self.assertEqual(actual_blob, expected_blob) - - @istest - def tree_to_directory_no_entries(self): - # given - tree = { - 'path': 'foo', - 'sha1_git': b'tree_sha1_git', - 'children': [{'type': git.GitType.TREE, - 'perms': git.GitPerm.TREE, - 'name': 'bar', - 'sha1_git': b'sha1-target'}, - {'type': git.GitType.BLOB, - 'perms': git.GitPerm.BLOB, - 'name': 'file-foo', - 'sha1_git': b'file-foo-sha1-target'}] - } - - expected_directory = { - 'id': b'tree_sha1_git', - 'entries': [{'type': 'dir', - 'perms': int(git.GitPerm.TREE.value), - 'name': 'bar', - 'target': b'sha1-target'}, - {'type': 'file', - 'perms': int(git.GitPerm.BLOB.value), - 'name': 'file-foo', - 'target': b'file-foo-sha1-target'}] - } - - # when - actual_directory = converters.tree_to_directory(tree) - - # then - self.assertEqual(actual_directory, expected_directory) - - @istest - def ref_to_occurrence_1(self): - # when - actual_occ = converters.ref_to_occurrence({ - 'id': 'some-id', - 'branch': 'some/branch' - }) - # then - self.assertEquals(actual_occ, { - 'id': 'some-id', - 'branch': b'some/branch' - }) - - @istest - def ref_to_occurrence_2(self): - # when - actual_occ = converters.ref_to_occurrence({ - 'id': 'some-id', - 'branch': b'some/branch' - }) - - # then - self.assertEquals(actual_occ, { - 'id': 'some-id', - 'branch': b'some/branch' - }) - - @istest - def shallow_blob(self): - # when - actual_blob = converters.shallow_blob({ - 'length': 1451, - 'sha1_git': - b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', - 'name': b'LDPCL', - 'type': b'blob', - 'sha256': - b'\xe6it!\x99\xb37UT\x8f\x0e\x8f\xd7o\x92"\xce\xa3\x1d\xd2\xe5D>M\xaaj/\x03\x138\xad\x1b', # noqa - 'perms': b'100644', - 'sha1': - b'.\x18Y\xd6M\x8c\x9a\xa4\xe1\xf1\xc7\x95\x082\xcf\xc9\xd8\nV)', - 'blake2s256': 'some-blak2s256', - 'path': - b'/tmp/tmp.c86tq5o9.swh.loader/pkg-doc-linux/copyrights/non-free/LDPCL' # noqa - }) - - # then - self.assertEqual(actual_blob, { - 'sha1': - b'.\x18Y\xd6M\x8c\x9a\xa4\xe1\xf1\xc7\x95\x082\xcf\xc9\xd8\nV)', - 'sha1_git': - b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', - 'sha256': - b'\xe6it!\x99\xb37UT\x8f\x0e\x8f\xd7o\x92"\xce\xa3\x1d\xd2\xe5D>M\xaaj/\x03\x138\xad\x1b', # noqa - 'blake2s256': 'some-blak2s256', - 'length': 1451, - }) - - @istest - def shallow_tree(self): - # when - actual_shallow_tree = converters.shallow_tree({ - 'length': 1451, - 'sha1_git': - b'tree-id', - 'type': b'tree', - 'sha256': - b'\xe6it!\x99\xb37UT\x8f\x0e\x8f\xd7o\x92"\xce\xa3\x1d\xd2\xe5D>M\xaaj/\x03\x138\xad\x1b', # noqa - 'perms': b'100644', - 'sha1': - b'.\x18Y\xd6M\x8c\x9a\xa4\xe1\xf1\xc7\x95\x082\xcf\xc9\xd8\nV)', - }) + log = Mock() - # then - self.assertEqual(actual_shallow_tree, b'tree-id') + expected_content = obj.copy() + expected_content.pop('data') + expected_content['status'] = 'absent' + expected_content['origin'] = 42 + expected_content['reason'] = 'Content too large' - @istest - def shallow_commit(self): - # when - actual_shallow_commit = converters.shallow_commit({ - 'sha1_git': - b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', - 'type': b'commit', - 'id': b'let-me-see-some-id', - }) - - # then - self.assertEqual(actual_shallow_commit, b'let-me-see-some-id') - - @istest - def shallow_tag(self): # when - actual_shallow_tag = converters.shallow_tag({ - 'sha1': - b'\xd1\xdd\x9a@\xeb\xf6!\x99\xd4[S\x05\xa8Y\xa3\x80\xa7\xb1;\x9c', - 'type': b'tag', - 'id': b'this-is-not-the-id-you-are-looking-for', - }) + content = converters.content_for_storage( + obj, log, max_content_size=len(data) - 1, + origin_id=expected_content['origin'], + ) # then - self.assertEqual(actual_shallow_tag, b'this-is-not-the-id-you-are-looking-for') # noqa + self.assertEqual(content, expected_content) + self.assertTrue(log.info.called) + self.assertIn('Skipping content', log.info.call_args[0][0]) + self.assertIn('too large', log.info.call_args[0][0])