diff --git a/swh/loader/pypi/client.py b/swh/loader/pypi/client.py index 3d3017b..ec43fc8 100644 --- a/swh/loader/pypi/client.py +++ b/swh/loader/pypi/client.py @@ -1,447 +1,447 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import shutil import arrow from pkginfo import UnpackedSDist import requests from swh.core import tarball from swh.model import hashutil from .converters import info, author try: from swh.loader.pypi._version import __version__ except ImportError: __version__ = 'devel' def _to_dict(pkginfo): """Given a pkginfo parsed structure, convert it to a dict. Args: pkginfo (UnpackedSDist): The sdist parsed structure Returns: parsed structure as a dict """ m = {} for k in pkginfo: m[k] = getattr(pkginfo, k) return m def _project_pkginfo(dir_path): """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from pypi. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive project_dirname = os.listdir(dir_path)[0] pkginfo_path = os.path.join(dir_path, project_dirname, 'PKG-INFO') if not os.path.exists(pkginfo_path): return None pkginfo = UnpackedSDist(pkginfo_path) return _to_dict(pkginfo) class PyPIClient: """PyPI client in charge of discussing with the pypi server. Args: base_url (str): PyPI instance's base url temp_directory (str): Path to the temporary disk location used for uncompressing the release artifacts cache (bool): Use an internal cache to keep the archives on disk. Default is not to use it. cache_dir (str): cache's disk location (relevant only with `cache` to True) Those last 2 parameters are not for production use. """ def __init__(self, base_url='https://pypi.org/pypi', temp_directory=None, cache=False, cache_dir=None): self.version = __version__ self.base_url = base_url self.temp_directory = temp_directory self.do_cache = cache if self.do_cache: self.cache_dir = cache_dir self.cache_raw_dir = os.path.join(cache_dir, 'archives') os.makedirs(self.cache_raw_dir, exist_ok=True) self.session = requests.session() self.params = { 'headers': { 'User-Agent': 'Software Heritage PyPI Loader (%s)' % ( __version__ ) } } def _save_response(self, response, project=None): """Log the response from a server request to a cache dir. Args: response (Response): full server response cache_dir (str): system path for cache dir Returns: nothing """ import gzip from json import dumps datepath = arrow.utcnow().isoformat() name = '%s.gz' % datepath if project is None else '%s-%s.gz' % ( project, datepath) fname = os.path.join(self.cache_dir, name) with gzip.open(fname, 'w') as f: f.write(bytes( dumps(response.json()), 'utf-8' )) def _save_raw(self, filepath): """In cache mode, backup the filepath to self.cache_raw_dir Args: filepath (str): Path of the file to save """ _filename = os.path.basename(filepath) _archive = os.path.join(self.cache_raw_dir, _filename) shutil.copyfile(filepath, _archive) def _get_raw(self, filepath): """In cache mode, we try to retrieve the cached file. """ _filename = os.path.basename(filepath) _archive = os.path.join(self.cache_raw_dir, _filename) if not os.path.exists(_archive): return None shutil.copyfile(_archive, filepath) return filepath def _get(self, url, project=None): """Get query to the url. Args: url (str): Url Raises: ValueError in case of failing to query Returns: Response as dict if ok """ response = self.session.get(url, **self.params) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) if self.do_cache: self._save_response(response, project=project) return response.json() def info(self, project_url, project=None): """Given a metadata project url, retrieve the raw json response Args: project_url (str): Project's pypi to retrieve information Returns: Main project information as dict. """ return self._get(project_url, project=project) def release(self, project, release): """Given a project and a release name, retrieve the raw information for said project's release. Args: project (str): Project's name release (dict): Release information Returns: Release information as dict """ release_url = '%s/%s/%s/json' % (self.base_url, project, release) return self._get(release_url, project=project) def prepare_release_artifacts(self, project, version, release_artifacts): """For a given project's release version, fetch and prepare the associated release artifacts. Args: project (str): PyPI Project version (str): Release version release_artifacts ([dict]): List of source distribution release artifacts Yields: tuple (artifact, filepath, uncompressed_path, pkginfo) where: - artifact (dict): release artifact's associated info - release (dict): release information - filepath (str): Local artifact's path - uncompressed_archive_path (str): uncompressed archive path - pkginfo (dict): package information or None if none found """ for artifact in release_artifacts: release = { 'name': version, 'message': artifact.get('comment_text', ''), } artifact = { 'sha256': artifact['digests']['sha256'], 'size': artifact['size'], 'filename': artifact['filename'], 'url': artifact['url'], 'date': artifact['upload_time'], } yield self.prepare_release_artifact(project, release, artifact) def prepare_release_artifact(self, project, release, artifact): """For a given release project, fetch and prepare the associated artifact. This: - fetches the artifact - checks the size, hashes match - uncompress the artifact locally - computes the swh hashes - returns the associated information for the artifact Args: project (str): Project's name release (dict): Release information artifact (dict): Release artifact information Returns: tuple (artifact, filepath, uncompressed_path, pkginfo) where: - release (dict): Release information (name, message) - artifact (dict): release artifact's information - filepath (str): Local artifact's path - uncompressed_archive_path (str): uncompressed archive path - pkginfo (dict): package information or None if none found """ version = release['name'] logging.debug('Release version: %s' % version) path = os.path.join(self.temp_directory, project, version) os.makedirs(path, exist_ok=True) filepath = os.path.join(path, artifact['filename']) logging.debug('Artifact local path: %s' % filepath) cache_hit = None if self.do_cache: cache_hit = self._get_raw(filepath) if cache_hit: h = hashutil.MultiHash.from_path(filepath, track_length=False) else: # no cache hit, we fetch from pypi url = artifact['url'] r = self.session.get(url, **self.params, stream=True) status = r.status_code if status != 200: if status == 404: raise ValueError("Project '%s' not found" % url) else: msg = "Fail to query '%s'\nCode: %s\nDetails: %s" % ( url, r.status_code, r.content) raise ValueError(msg) length = int(r.headers['content-length']) if length != artifact['size']: raise ValueError('Error when checking size: %s != %s' % ( artifact['size'], length)) h = hashutil.MultiHash(length=length) with open(filepath, 'wb') as f: for chunk in r.iter_content(): h.update(chunk) f.write(chunk) hashes = h.hexdigest() actual_digest = hashes['sha256'] if actual_digest != artifact['sha256']: raise ValueError( '%s %s: Checksum mismatched: %s != %s' % ( project, version, artifact['sha256'], actual_digest)) if not cache_hit and self.do_cache: self._save_raw(filepath) uncompress_path = os.path.join(path, 'uncompress') os.makedirs(uncompress_path, exist_ok=True) nature = tarball.uncompress(filepath, uncompress_path) artifact['archive_type'] = nature artifact.update(hashes) pkginfo = _project_pkginfo(uncompress_path) return release, artifact, filepath, uncompress_path, pkginfo class PyPIProject: """PyPI project representation This allows to extract information for a given project: - either its latest information (from the latest release) - either for a given release version - uncompress associated fetched release artifacts This also fetches and uncompresses the associated release artifacts. """ def __init__(self, client, project, project_metadata_url, data=None): self.client = client self.project = project self.project_metadata_url = project_metadata_url if data: self.data = data else: self.data = client.info(project_metadata_url, project) self.last_version = self.data['info']['version'] self.cache = { self.last_version: self.data } def _data(self, release_name=None): """Fetch data per release and cache it. Returns the cache retrieved data if already fetched. """ if release_name: data = self.cache.get(release_name) if not data: data = self.client.release(self.project, release_name) self.cache[release_name] = data else: data = self.data return data def info(self, release_name=None): """Compute release information for provided release (or latest one). """ return info(self._data(release_name)) def _filter_release_artifacts(self, version, releases, known_artifacts=None): """Filter not already known sdist (source distribution) release. There can be multiple 'package_type' (sdist, bdist_egg, bdist_wheel, bdist_rpm, bdist_msi, bdist_wininst, ...), we are only interested in source distribution (sdist), others bdist* are binary Args: version (str): Release name or version releases (dict/[dict]): Full release object (or a list of) known_artifacts ([tuple]): List of known releases (tuple filename, sha256) Yields: an unknown release artifact """ if not releases: return [] if not isinstance(releases, list): releases = [releases] if not known_artifacts: known_artifacts = set() for artifact in releases: name = artifact['filename'] sha256 = artifact['digests']['sha256'] if (name, sha256) in known_artifacts: logging.debug('artifact (%s, %s) already seen for release %s, skipping' % ( # noqa name, sha256, version)) continue if artifact['packagetype'] != 'sdist': continue yield artifact def _cleanup_release_artifacts(self, archive_path, directory_path): """Clean intermediary files which no longer needs to be present. """ if directory_path and os.path.exists(directory_path): logging.debug('Clean up uncompressed archive path %s' % ( directory_path, )) shutil.rmtree(directory_path) if archive_path and os.path.exists(archive_path): logging.debug('Clean up archive %s' % archive_path) os.unlink(archive_path) - def releases(self, known_artifacts): + def download_new_releases(self, known_artifacts): """Fetch metadata/data per release (if new release artifact detected) For new release artifact, this: - downloads and uncompresses the release artifacts. - yields the (release info, author info, release, dir_path) - Clean up the intermediary fetched artifact files Args: known_artifacts (tuple): artifact name, artifact sha256 hash Yields: tuple (version, release_info, release, uncompressed_path) where: - project_info (dict): release's associated version info - author (dict): Author information for the release - artifact (dict): Release artifact information - release (dict): release metadata - uncompressed_path (str): Path to uncompressed artifact """ releases_dict = self.data['releases'] for version, releases in releases_dict.items(): releases = self._filter_release_artifacts( version, releases, known_artifacts) releases = self.client.prepare_release_artifacts( self.project, version, releases) for release, artifact, archive, dir_path, pkginfo in releases: if pkginfo is None: # fallback to pypi api metadata msg = '%s %s: No PKG-INFO detected, skipping' % ( # noqa self.project, version) logging.warn(msg) continue yield pkginfo, author(pkginfo), release, artifact, dir_path self._cleanup_release_artifacts(archive, dir_path) diff --git a/swh/loader/pypi/loader.py b/swh/loader/pypi/loader.py index f855a7a..8da6b2e 100644 --- a/swh/loader/pypi/loader.py +++ b/swh/loader/pypi/loader.py @@ -1,263 +1,265 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil from tempfile import mkdtemp import arrow from swh.loader.core.utils import clean_dangling_folders from swh.loader.core.loader import SWHLoader from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes, normalize_timestamp ) from .client import PyPIClient, PyPIProject TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.pypi.' DEBUG_MODE = '** DEBUG MODE **' class PyPILoader(SWHLoader): CONFIG_BASE_FILENAME = 'loader/pypi' ADDITIONAL_CONFIG = { 'temp_directory': ('str', '/tmp/swh.loader.pypi/'), 'cache': ('bool', False), 'cache_dir': ('str', ''), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, client=None): super().__init__(logging_class='swh.loader.pypi.PyPILoader') self.origin_id = None if not client: temp_directory = self.config['temp_directory'] os.makedirs(temp_directory, exist_ok=True) self.temp_directory = mkdtemp( suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=temp_directory) self.pypi_client = PyPIClient( temp_directory=self.temp_directory, cache=self.config['cache'], cache_dir=self.config['cache_dir']) else: self.temp_directory = client.temp_directory self.pypi_client = client self.debug = self.config['debug'] self.done = False def pre_cleanup(self): """To prevent disk explosion if some other workers exploded in mid-air (OOM killed), we try and clean up dangling files. """ if self.debug: self.log.warn('%s Will not pre-clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return clean_dangling_folders(self.config['temp_directory'], pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """Clean up temporary disk use """ if self.debug: self.log.warn('%s Will not clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, project_name, origin_url, origin_metadata_url=None): """Prepare the origin visit information Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.origin = { 'url': origin_url, 'type': 'pypi', } self.visit_date = None # loader core will populate it def _known_artifacts(self, last_snapshot): """Retrieve the known releases/artifact for the origin_id. Args snapshot (dict): Last snapshot for the visit Returns: list of (filename, sha256) tuples. """ revs = [rev['target'] for rev in last_snapshot['branches'].values()] known_revisions = self.storage.revision_get(revs) ret = [] for revision in known_revisions: if 'original_artifact' in revision['metadata']: artifact = revision['metadata']['original_artifact'] ret.append((artifact['filename'], artifact['sha256'])) return ret def _last_snapshot(self): """Retrieve the last snapshot """ return self.storage.snapshot_get_latest(self.origin_id) def prepare(self, project_name, origin_url, origin_metadata_url=None): """Keep reference to the origin url (project) and the project metadata url Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.project_name = project_name self.origin_url = origin_url self.origin_metadata_url = origin_metadata_url self.project = PyPIProject(self.pypi_client, self.project_name, self.origin_metadata_url) self._prepare_state() def _prepare_state(self): """Initialize internal state (snapshot, contents, directories, etc...) This is called from `prepare` method. """ last_snapshot = self._last_snapshot() if last_snapshot: self._snapshot = last_snapshot.copy() known_artifacts = self._known_artifacts(self._snapshot) else: self._snapshot = { 'branches': {} } known_artifacts = [] # and the artifacts # that will be the source of data to retrieve - self.release_artifacts = self.project.releases(known_artifacts) + self.release_artifacts = self.project.download_new_releases( + known_artifacts + ) # temporary state self._contents = [] self._directories = [] self._revisions = [] def fetch_data(self): """Called once per release artifact version (can be many for one release). This will for each call: - retrieve a release artifact (associated to a release version) - Uncompress it and compute the necessary information - Computes the swh objects Returns: True as long as data to fetch exist """ data = None if self.done: return False try: data = next(self.release_artifacts) except StopIteration: self.done = True return False project_info, author, release, artifact, dir_path = data dir_path = dir_path.encode('utf-8') directory = Directory.from_disk(path=dir_path, data=True) _objects = directory.collect() self._contents = _objects['content'].values() self._directories = _objects['directory'].values() date = normalize_timestamp( int(arrow.get(artifact['date']).timestamp)) name = release['name'].encode('utf-8') message = release['message'].encode('utf-8') if message: message = b'%s: %s' % (name, message) else: message = name _revision = { 'synthetic': True, 'metadata': { 'original_artifact': artifact, 'project': project_info, }, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'message': message, 'directory': directory.hash, 'parents': [], 'type': 'tar', } _revision['id'] = identifier_to_bytes( revision_identifier(_revision)) self._revisions.append(_revision) branch_name = artifact['filename'].encode('utf-8') self._snapshot['branches'][branch_name] = { 'target': _revision['id'], 'target_type': 'revision', } return not self.done def generate_and_load_snapshot(self): self._snapshot['id'] = identifier_to_bytes( snapshot_identifier(self._snapshot)) self.maybe_load_snapshot(self._snapshot) def store_data(self): """(override) This sends collected objects to storage. """ self.maybe_load_contents(self._contents) self.maybe_load_directories(self._directories) self.maybe_load_revisions(self._revisions) if self.done: self.generate_and_load_snapshot() self.flush() if __name__ == '__main__': import logging import sys logging.basicConfig(level=logging.DEBUG) if len(sys.argv) != 2: logging.error('Usage: %s ' % sys.argv[0]) sys.exit(1) module_name = sys.argv[1] loader = PyPILoader() loader.load( module_name, 'https://pypi.org/projects/%s/' % module_name, 'https://pypi.org/pypi/%s/json' % module_name, ) diff --git a/swh/loader/pypi/tests/test_client.py b/swh/loader/pypi/tests/test_client.py index cc2024a..2bd2d48 100644 --- a/swh/loader/pypi/tests/test_client.py +++ b/swh/loader/pypi/tests/test_client.py @@ -1,78 +1,78 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from nose.tools import istest from swh.loader.pypi import converters from swh.loader.pypi.client import _project_pkginfo from .common import WithProjectTest class PyPIProjectTest(WithProjectTest): @istest def releases(self): - actual_releases = self.project.releases([]) + actual_releases = self.project.download_new_releases([]) expected_release_artifacts = { '1.1.0': { 'archive_type': 'zip', 'blake2s256': 'df9413bde66e6133b10cadefad6fcf9cbbc369b47831089112c846d79f14985a', # noqa 'date': '2016-01-31T05:28:42', 'filename': '0805nexter-1.1.0.zip', 'sha1': '127d8697db916ba1c67084052196a83319a25000', 'sha1_git': '4b8f1350e6d9fa00256e974ae24c09543d85b196', 'sha256': '52cd128ad3afe539478abc7440d4b043384295fbe6b0958a237cb6d926465035', # noqa 'size': 862, 'url': 'https://files.pythonhosted.org/packages/ec/65/c0116953c9a3f47de89e71964d6c7b0c783b01f29fa3390584dbf3046b4d/0805nexter-1.1.0.zip', # noqa }, '1.2.0': { 'archive_type': 'zip', 'blake2s256': '67010586b5b9a4aaa3b1c386f9dc8b4c99e6e40f37732a717a5f9b9b1185e588', # noqa 'date': '2016-01-31T05:51:25', 'filename': '0805nexter-1.2.0.zip', 'sha1': 'd55238554b94da7c5bf4a349ece0fe3b2b19f79c', 'sha1_git': '8638d33a96cb25d8319af21417f00045ec6ee810', 'sha256': '49785c6ae39ea511b3c253d7621c0b1b6228be2f965aca8a491e6b84126d0709', # noqa 'size': 898, 'url': 'https://files.pythonhosted.org/packages/c4/a0/4562cda161dc4ecbbe9e2a11eb365400c0461845c5be70d73869786809c4/0805nexter-1.2.0.zip', # noqa } } expected_releases = { '1.1.0': { 'name': '1.1.0', 'message': '', }, '1.2.0': { 'name': '1.2.0', 'message': '', }, } dir_paths = [] for pkginfo, author, release, artifact, dir_path in actual_releases: version = pkginfo['version'] expected_pkginfo = _project_pkginfo(dir_path) self.assertEquals(pkginfo, expected_pkginfo) expected_author = converters.author(expected_pkginfo) self.assertEqual(author, expected_author) expected_artifact = expected_release_artifacts[version] self.assertEqual(artifact, expected_artifact) expected_release = expected_releases[version] self.assertEqual(release, expected_release) self.assertTrue(version in dir_path) self.assertTrue(self.project_name in dir_path) # path still exists self.assertTrue(os.path.exists(dir_path)) dir_paths.append(dir_path) # Ensure uncompressed paths have been destroyed for dir_path in dir_paths: # path no longer exists self.assertFalse(os.path.exists(dir_path))