diff --git a/swh/loader/pypi/loader.py b/swh/loader/pypi/loader.py index 0be7654..59664a1 100644 --- a/swh/loader/pypi/loader.py +++ b/swh/loader/pypi/loader.py @@ -1,209 +1,244 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import os import shutil from tempfile import mkdtemp from swh.loader.core.utils import clean_dangling_folders from swh.loader.core.loader import SWHLoader from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes, normalize_timestamp ) from .client import PyPIClient, PyPIProject TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.pypi.' DEBUG_MODE = '** DEBUG MODE **' class PyPILoader(SWHLoader): CONFIG_BASE_FILENAME = 'loader/pypi' ADDITIONAL_CONFIG = { 'temp_directory': ('str', '/tmp/swh.loader.pypi/'), 'cache': ('bool', False), 'cache_dir': ('str', ''), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, client=None): super().__init__(logging_class='swh.loader.pypi.PyPILoader') self.origin_id = None if not client: temp_directory = self.config['temp_directory'] os.makedirs(temp_directory, exist_ok=True) self.temp_directory = mkdtemp( suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=temp_directory) self.pypi_client = PyPIClient( temp_directory=self.temp_directory, cache=self.config['cache'], cache_dir=self.config['cache_dir']) else: self.temp_directory = client.temp_directory self.pypi_client = client self.debug = self.config['debug'] + self.done = False def pre_cleanup(self): """To prevent disk explosion if some other workers exploded in mid-air (OOM killed), we try and clean up dangling files. """ if self.debug: self.log.warn('%s Will not pre-clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return clean_dangling_folders(self.config['temp_directory'], pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """Clean up temporary disk use """ if self.debug: self.log.warn('%s Will not clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, project_name, origin_url, origin_metadata_url=None): """Prepare the origin visit information Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.origin = { 'url': origin_url, 'type': 'pypi', } self.visit_date = None # loader core will populate it - def prepare(self, project_name, origin_url, - origin_metadata_url=None): - """Keep reference to the origin url (project) and the - project metadata url - - Args: - project_name (str): Project's simple name - origin_url (str): Project's main url - origin_metadata_url (str): Project's metadata url - - """ - self.project_name = project_name - self.origin_url = origin_url - self.origin_metadata_url = origin_metadata_url - self.project = PyPIProject(self.pypi_client, self.project_name, - self.origin_metadata_url) - def _known_artifacts(self, last_snapshot): """Retrieve the known releases/artifact for the origin_id. Args snapshot (dict): Last snapshot for the visit Returns: tuple artifact's filename, artifact's sha256 """ revs = [rev['target'] for rev in last_snapshot['branches'].values()] known_revisions = self.storage.revision_get(revs) for revision in known_revisions: artifact = revision['metadata']['original_artifact'] yield artifact['filename'], artifact['sha256'] def _last_snapshot(self): """Retrieve the last snapshot """ return self.storage.snapshot_get_latest(self.origin_id) - def fetch_data(self): - """(override) Fetch and collect swh objects. + def prepare(self, project_name, origin_url, + origin_metadata_url=None): + """Keep reference to the origin url (project) and the + project metadata url + + Args: + project_name (str): Project's simple name + origin_url (str): Project's main url + origin_metadata_url (str): Project's metadata url + + """ + self.project_name = project_name + self.origin_url = origin_url + self.origin_metadata_url = origin_metadata_url + self.project = PyPIProject(self.pypi_client, self.project_name, + self.origin_metadata_url) + self._prepare_state() + + def _prepare_state(self): + """Initialize internal state (snapshot, contents, directories, etc...) + + This is called from `prepare` method. """ last_snapshot = self._last_snapshot() if last_snapshot: self._snapshot = last_snapshot.copy() known_artifacts = self._known_artifacts(self._snapshot) else: self._snapshot = { 'branches': {} } known_artifacts = [] - + # and the artifacts + # that will be the source of data to retrieve + self.release_artifacts = self.project.releases(known_artifacts) + # temporary state self._contents = [] self._directories = [] self._revisions = [] - for project_info, author, release, artifact, dir_path in \ - self.project.releases(known_artifacts): - - dir_path = dir_path.encode('utf-8') - directory = Directory.from_disk(path=dir_path, data=True) - _objects = directory.collect() - - self._contents.extend(_objects['content'].values()) - self._directories.extend(_objects['directory'].values()) - date = normalize_timestamp( - int(arrow.get(artifact['date']).timestamp)) - - name = release['name'].encode('utf-8') - message = release['message'].encode('utf-8') - if message: - message = b'%s: %s' % (name, message) - else: - message = name - - _revision = { - 'synthetic': True, - 'metadata': { - 'original_artifact': artifact, - 'project': project_info, - }, - 'author': author, - 'date': date, - 'committer': author, - 'committer_date': date, - 'message': message, - 'directory': directory.hash, - 'parents': [], - 'type': 'tar', - } - _revision['id'] = identifier_to_bytes( - revision_identifier(_revision)) - self._revisions.append(_revision) - - branch_name = artifact['filename'].encode('utf-8') - self._snapshot['branches'][branch_name] = { - 'target': _revision['id'], - 'target_type': 'revision', - } + def fetch_data(self): + """Called once per release artifact version (can be many for one + release). + + This will for each call: + - retrieve a release artifact (associated to a release version) + - Uncompress it and compute the necessary information + - Computes the swh objects + + Returns: + True as long as data to fetch exist + """ + data = None + if self.done: + return False + + try: + data = next(self.release_artifacts) + except StopIteration: + self.done = True + return False + + project_info, author, release, artifact, dir_path = data + dir_path = dir_path.encode('utf-8') + directory = Directory.from_disk(path=dir_path, data=True) + _objects = directory.collect() + + self._contents = _objects['content'].values() + self._directories = _objects['directory'].values() + date = normalize_timestamp( + int(arrow.get(artifact['date']).timestamp)) + + name = release['name'].encode('utf-8') + message = release['message'].encode('utf-8') + if message: + message = b'%s: %s' % (name, message) + else: + message = name + + _revision = { + 'synthetic': True, + 'metadata': { + 'original_artifact': artifact, + 'project': project_info, + }, + 'author': author, + 'date': date, + 'committer': author, + 'committer_date': date, + 'message': message, + 'directory': directory.hash, + 'parents': [], + 'type': 'tar', + } + _revision['id'] = identifier_to_bytes( + revision_identifier(_revision)) + self._revisions.append(_revision) + + branch_name = artifact['filename'].encode('utf-8') + self._snapshot['branches'][branch_name] = { + 'target': _revision['id'], + 'target_type': 'revision', + } + + return not self.done + + def generate_and_load_snapshot(self): self._snapshot['id'] = identifier_to_bytes( snapshot_identifier(self._snapshot)) + self.maybe_load_snapshot(self._snapshot) def store_data(self): """(override) This sends collected objects to storage. """ self.maybe_load_contents(self._contents) self.maybe_load_directories(self._directories) self.maybe_load_revisions(self._revisions) - self.maybe_load_snapshot(self._snapshot) + + if self.done: + self.generate_and_load_snapshot() + self.flush() diff --git a/swh/loader/pypi/tests/test_loader.py b/swh/loader/pypi/tests/test_loader.py index 9be3515..f6d5201 100644 --- a/swh/loader/pypi/tests/test_loader.py +++ b/swh/loader/pypi/tests/test_loader.py @@ -1,258 +1,259 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import shutil import tempfile from nose.plugins.attrib import attr from nose.tools import istest from unittest import TestCase from swh.model import hashutil from swh.loader.pypi.client import PyPIProject from swh.loader.pypi.loader import PyPILoader from .common import PyPIClientWithCache, RESOURCES_PATH, LoaderNoStorage class TestPyPILoader(LoaderNoStorage, PyPILoader): """Real PyPILoader for test purposes (storage and pypi interactions inhibited) """ def __init__(self, project_name): project_metadata_file = '%s/%s.json' % (RESOURCES_PATH, project_name) project_metadata_url = 'https://pypi.org/pypi/%s/json' % project_name with open(project_metadata_file) as f: data = json.load(f) temp_dir = tempfile.mkdtemp( dir='/tmp/', prefix='swh.loader.pypi.tests-') # Will use the pypi with cache client = PyPIClientWithCache( temp_directory=temp_dir, cache_dir=RESOURCES_PATH) super().__init__(client=client) self.project = PyPIProject( client=client, project=project_name, project_metadata_url=project_metadata_url, data=data) def prepare(self, project_name, origin_url, origin_metadata_url=None): self.project_name = project_name self.origin_url = origin_url self.origin_metadata_url = origin_metadata_url self.visit = 1 # first visit + self._prepare_state() @attr('fs') class BaseLoaderITest(TestCase): """Loader Test Mixin to prepare the pypi to 'load' in a test context. In this setup, the loader uses the cache to load data so no network interaction (no storage, no pypi). """ def setUp(self, project_name='0805nexter', dummy_pypi_instance='https://dummy.org'): self.tmp_root_path = tempfile.mkdtemp() self.loader = PyPILoaderNoSnapshot(project_name=project_name) self._project = project_name self._origin_url = '%s/pypi/%s/' % (dummy_pypi_instance, project_name) self._project_metadata_url = '%s/pypi/%s/json' % ( dummy_pypi_instance, project_name) def tearDown(self): shutil.rmtree(self.tmp_root_path) def assertContentsOk(self, expected_contents): contents = self.loader.all_contents self.assertEquals(len(contents), len(expected_contents)) for content in contents: content_id = hashutil.hash_to_hex(content['sha1']) self.assertIn(content_id, expected_contents) def assertDirectoriesOk(self, expected_directories): directories = self.loader.all_directories self.assertEquals(len(directories), len(expected_directories)) for _dir in directories: _dir_id = hashutil.hash_to_hex(_dir['id']) self.assertIn(_dir_id, expected_directories) def assertSnapshotOk(self, expected_snapshot, expected_revisions): snapshots = self.loader.all_snapshots self.assertEqual(len(snapshots), 1) snap = snapshots[0] snap_id = hashutil.hash_to_hex(snap['id']) self.assertEqual(snap_id, expected_snapshot) branches = snap['branches'] self.assertEqual(len(expected_revisions), len(branches)) for branch, target in branches.items(): rev_id = hashutil.hash_to_hex(target['target']) self.assertIn(rev_id, expected_revisions) self.assertEqual('revision', target['target_type']) def assertRevisionsOk(self, expected_revisions): # noqa: N802 """Check the loader's revisions match the expected revisions. Expects self.loader to be instantiated and ready to be inspected (meaning the loading took place). Args: expected_revisions (dict): Dict with key revision id, value the targeted directory id. """ # The last revision being the one used later to start back from for rev in self.loader.all_revisions: rev_id = hashutil.hash_to_hex(rev['id']) directory_id = hashutil.hash_to_hex(rev['directory']) self.assertEquals(expected_revisions[rev_id], directory_id) # Define loaders with no storage # They'll just accumulate the data in place # Only for testing purposes. class PyPILoaderNoSnapshot(TestPyPILoader): """Same as TestPyPILoader with no prior snapshot seen """ def _last_snapshot(self): return None class LoaderITest(BaseLoaderITest): def setUp(self, project_name='0805nexter', dummy_pypi_instance='https://dummy.org'): super().setUp(project_name, dummy_pypi_instance) self.loader = PyPILoaderNoSnapshot(project_name=project_name) @istest def load(self): """Load a pypi origin """ # when self.loader.load( self._project, self._origin_url, self._project_metadata_url) # then self.assertEquals(len(self.loader.all_contents), 6, '3 contents per release artifact files (2)') self.assertEquals(len(self.loader.all_directories), 4) self.assertEquals(len(self.loader.all_revisions), 2, '2 releases so 2 revisions should be created') self.assertEquals(len(self.loader.all_releases), 0, 'No release is created in the pypi loader') self.assertEquals(len(self.loader.all_snapshots), 1, 'Only 1 snapshot targetting all revisions') expected_contents = [ 'a61e24cdfdab3bb7817f6be85d37a3e666b34566', '938c33483285fd8ad57f15497f538320df82aeb8', 'a27576d60e08c94a05006d2e6d540c0fdb5f38c8', '405859113963cb7a797642b45f171d6360425d16', 'e5686aa568fdb1d19d7f1329267082fe40482d31', '83ecf6ec1114fd260ca7a833a2d165e71258c338', ] self.assertContentsOk(expected_contents) expected_directories = [ '05219ba38bc542d4345d5638af1ed56c7d43ca7d', 'cf019eb456cf6f78d8c4674596f1c9a97ece8f44', 'b178b66bd22383d5f16f4f5c923d39ca798861b4', 'c3a58f8b57433a4b56caaa5033ae2e0931405338', ] self.assertDirectoriesOk(expected_directories) # {revision hash: directory hash} expected_revisions = { '4c99891f93b81450385777235a37b5e966dd1571': '05219ba38bc542d4345d5638af1ed56c7d43ca7d', # noqa 'e445da4da22b31bfebb6ffc4383dbf839a074d21': 'b178b66bd22383d5f16f4f5c923d39ca798861b4', # noqa } self.assertRevisionsOk(expected_revisions) self.assertSnapshotOk('f456b03e8bf1920d64b00df234b1efedc25b6c93', expected_revisions) class PyPILoaderWithSnapshot(TestPyPILoader): """Same as TestPyPILoader with no prior snapshot seen """ def _last_snapshot(self): return { 'id': b'\xf4V\xb0>\x8b\xf1\x92\rd\xb0\r\xf24\xb1\xef\xed\xc2[l\x93', # noqa 'branches': { b'0805nexter-1.1.0.zip': { 'target': b'L\x99\x89\x1f\x93\xb8\x14P' b'8Ww#Z7\xb5\xe9f\xdd\x15q', 'target_type': 'revision' }, b'0805nexter-1.2.0.zip': { 'target': b'\xe4E\xdaM\xa2+1\xbf' b'\xeb\xb6\xff\xc48=\xbf\x83' b'\x9a\x07M!', 'target_type': 'revision' }, }, } def _known_artifacts(self, last_snapshot): yield from [ ( '0805nexter-1.1.0.zip', '52cd128ad3afe539478abc7440d4b043384295fbe6b0958a237cb6d926465035' # noqa ), ( '0805nexter-1.2.0.zip', '49785c6ae39ea511b3c253d7621c0b1b6228be2f965aca8a491e6b84126d0709' # noqa ) ] class LoaderWithOriginAlreadySeenITest(BaseLoaderITest): def setUp(self, project_name='0805nexter', dummy_pypi_instance='https://dummy.org'): super().setUp(project_name, dummy_pypi_instance) self.loader = PyPILoaderWithSnapshot(project_name=project_name) @istest def load(self): """Load a pypi origin already injected will result with only 1 snapshot """ # when self.loader.load( self._project, self._origin_url, self._project_metadata_url) # then self.assertEquals(len(self.loader.all_contents), 0) self.assertEquals(len(self.loader.all_directories), 0) self.assertEquals(len(self.loader.all_revisions), 0) self.assertEquals(len(self.loader.all_releases), 0) self.assertEquals(len(self.loader.all_snapshots), 1) self.assertContentsOk([]) self.assertDirectoriesOk([]) self.assertRevisionsOk(expected_revisions={}) expected_revisions = { '4c99891f93b81450385777235a37b5e966dd1571': '05219ba38bc542d4345d5638af1ed56c7d43ca7d', # noqa 'e445da4da22b31bfebb6ffc4383dbf839a074d21': 'b178b66bd22383d5f16f4f5c923d39ca798861b4', # noqa } self.assertSnapshotOk('f456b03e8bf1920d64b00df234b1efedc25b6c93', expected_revisions)