diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import datetime +import logging import tempfile import os @@ -19,6 +20,9 @@ from swh.loader.core.converters import content_for_storage +logger = logging.getLogger(__name__) + + # Not implemented yet: # - clean up disk routines from previous killed workers (when OOMkilled) # -> separation of concern would like this to be abstracted from the code @@ -167,102 +171,107 @@ status_visit = 'partial' # either: partial, full tmp_revisions = {} - # Prepare origin and origin_visit - origin = {'url': self.url} - self.storage.origin_add([origin]) - visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - visit_id = self.storage.origin_visit_add( - origin=self.url, - date=visit_date, - type=self.visit_type)['visit'] - - # Retrieve the default release (the "latest" one) - default_release = self.get_default_release() - - # FIXME: Add load exceptions handling - for version in self.get_versions(): # for each - tmp_revisions[version] = [] - # `a_` stands for `artifact_` - for a_filename, a_uri, a_metadata in self.get_artifacts(version): - with tempfile.TemporaryDirectory() as tmpdir: - a_path, a_computed_metadata = self.fetch_artifact_archive( - a_uri, dest=tmpdir) - - uncompressed_path = os.path.join(tmpdir, 'src') - uncompress(a_path, dest=uncompressed_path) - - directory = Directory.from_disk( - path=uncompressed_path.encode('utf-8'), data=True) - # FIXME: Try not to load the full raw content in memory - objects = directory.collect() - - contents = objects['content'].values() - self.storage.content_add( - map(content_for_storage, contents)) - - status_load = 'eventful' - directories = objects['directory'].values() - - self.storage.directory_add(directories) - - # FIXME: This should be release. cf. D409 discussion - revision = self.build_revision( - a_metadata, uncompressed_path) - revision.update({ - 'type': 'tar', - 'synthetic': True, - 'directory': directory.hash, - }) - revision['metadata'].update({ - 'original_artifact': a_metadata, - 'hashes_artifact': a_computed_metadata - }) - - revision['id'] = identifier_to_bytes( - revision_identifier(revision)) - self.storage.revision_add([revision]) - - tmp_revisions[version].append({ - 'filename': a_filename, - 'target': revision['id'], - }) - - # Build and load the snapshot - branches = {} - for version, v_branches in tmp_revisions.items(): - if len(v_branches) == 1: - branch_name = ('releases/%s' % version).encode('utf-8') - if version == default_release: - branches[b'HEAD'] = { - 'target_type': 'alias', - 'target': branch_name, - } + try: + # Prepare origin and origin_visit + origin = {'url': self.url} + self.storage.origin_add([origin]) + visit_date = datetime.datetime.now(tz=datetime.timezone.utc) + visit_id = self.storage.origin_visit_add( + origin=self.url, + date=visit_date, + type=self.visit_type)['visit'] + + # Retrieve the default release (the "latest" one) + default_release = self.get_default_release() + + # FIXME: Add load exceptions handling + for version in self.get_versions(): # for each + tmp_revisions[version] = [] + # `a_` stands for `artifact_` + for a_filename, a_uri, a_metadata in self.get_artifacts( + version): + with tempfile.TemporaryDirectory() as tmpdir: + # a_c_: archive_computed_ + a_path, a_c_metadata = self.fetch_artifact_archive( + a_uri, dest=tmpdir) + + uncompressed_path = os.path.join(tmpdir, 'src') + uncompress(a_path, dest=uncompressed_path) + + directory = Directory.from_disk( + path=uncompressed_path.encode('utf-8'), data=True) + # FIXME: Try not to load the full raw content in memory + objects = directory.collect() + + contents = objects['content'].values() + self.storage.content_add( + map(content_for_storage, contents)) + + status_load = 'eventful' + directories = objects['directory'].values() + + self.storage.directory_add(directories) + + # FIXME: This should be release. cf. D409 discussion + revision = self.build_revision( + a_metadata, uncompressed_path) + revision.update({ + 'type': 'tar', + 'synthetic': True, + 'directory': directory.hash, + }) + revision['metadata'].update({ + 'original_artifact': a_metadata, + 'hashes_artifact': a_c_metadata + }) + + revision['id'] = identifier_to_bytes( + revision_identifier(revision)) + self.storage.revision_add([revision]) + + tmp_revisions[version].append({ + 'filename': a_filename, + 'target': revision['id'], + }) + + # Build and load the snapshot + branches = {} + for version, v_branches in tmp_revisions.items(): + if len(v_branches) == 1: + branch_name = ('releases/%s' % version).encode('utf-8') + if version == default_release: + branches[b'HEAD'] = { + 'target_type': 'alias', + 'target': branch_name, + } - branches[branch_name] = { - 'target_type': 'revision', - 'target': v_branches[0]['target'], - } - else: - for x in v_branches: - branch_name = ('releases/%s/%s' % ( - version, v_branches['filename'])).encode('utf-8') branches[branch_name] = { 'target_type': 'revision', - 'target': x['target'], + 'target': v_branches[0]['target'], } - snapshot = { - 'branches': branches - } - snapshot['id'] = identifier_to_bytes( - snapshot_identifier(snapshot)) - self.storage.snapshot_add([snapshot]) - - # come so far, we actually reached a full visit - status_visit = 'full' - - # Update the visit's state - self.storage.origin_visit_update( - origin=self.url, visit_id=visit_id, status=status_visit, - snapshot=snapshot) - - return {'status': status_load} + else: + for x in v_branches: + branch_name = ('releases/%s/%s' % ( + version, v_branches['filename'])).encode('utf-8') + branches[branch_name] = { + 'target_type': 'revision', + 'target': x['target'], + } + snapshot = { + 'branches': branches + } + snapshot['id'] = identifier_to_bytes( + snapshot_identifier(snapshot)) + self.storage.snapshot_add([snapshot]) + + # come so far, we actually reached a full visit + status_visit = 'full' + + # Update the visit's state + self.storage.origin_visit_update( + origin=self.url, visit_id=visit_id, status=status_visit, + snapshot=snapshot) + except ValueError as e: + logger.warning('Fail to load %s. Reason: %s' % (self.url, e)) + finally: + return {'status': status_load} diff --git a/swh/loader/package/tests/test_gnu.py b/swh/loader/package/tests/test_gnu.py --- a/swh/loader/package/tests/test_gnu.py +++ b/swh/loader/package/tests/test_gnu.py @@ -131,31 +131,34 @@ # gnu used to use `release/` (singular) instead of plural _expected_new_snapshot_first_visit_id = 'c419397fd912039825ebdbea378bc6283f006bf5' # noqa -# def test_release_artifact_not_found(requests_mock): -# package = '8sync' -# package_url = 'https://ftp.gnu.org/gnu/8sync/' -# tarballs = [{ -# 'date': '944729610', -# 'archive': 'https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz', -# }] - -# loader = GNULoader(package, package_url, tarballs) -# requests_mock.get(re.compile('https://'), status_code=404) - -# assert actual_load_status == {'status': 'uneventful'} -# stats = loader.storage.stat_counters() - -# assert { -# 'content': 0, -# 'directory': 0, -# 'origin': 1, -# 'origin_visit': 1, -# 'person': 0, -# 'release': 0, -# 'revision': 0, -# 'skipped_content': 0, -# 'snapshot': 0, -# } == stats + +def test_release_artifact_not_found(requests_mock): + package = '8sync' + package_url = 'https://ftp.gnu.org/gnu/8sync/' + tarballs = [{ + 'date': '944729610', + 'archive': 'https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz', + }] + + loader = GNULoader(package, package_url, tarballs) + requests_mock.get(re.compile('https://'), status_code=404) + + actual_load_status = loader.load() + + assert actual_load_status == {'status': 'uneventful'} + stats = loader.storage.stat_counters() + + assert { + 'content': 0, + 'directory': 0, + 'origin': 1, + 'origin_visit': 1, + 'person': 0, + 'release': 0, + 'revision': 0, + 'skipped_content': 0, + 'snapshot': 0, + } == stats def test_release_artifact_no_prior_visit(requests_mock):