diff --git a/swh/loader/pypi/client.py b/swh/loader/pypi/client.py index d7d97a9..45c30de 100644 --- a/swh/loader/pypi/client.py +++ b/swh/loader/pypi/client.py @@ -1,462 +1,471 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import hashlib import logging import os import requests import shutil from .converters import info, author from pkginfo import UnpackedSDist from swh.core import tarball from swh.model import hashutil try: from swh.loader.pypi._version import __version__ except ImportError: __version__ = 'devel' def convert_to_hex(d): """Convert a flat dictionary with bytes in values to the same dictionary with hex as values. Args: dict: flat dictionary with sha bytes in their values. Returns: Mirror dictionary with values as string hex. """ if not d: return d checksums = {} for key, h in d.items(): if isinstance(h, bytes): checksums[key] = hashutil.hash_to_hex(h) else: checksums[key] = h return checksums def _to_dict(pkginfo): """Given a pkginfo parsed structure, convert it to a dict. Args: pkginfo (UnpackedSDist): The sdist parsed structure Returns: parsed structure as a dict """ m = {} for k in pkginfo: m[k] = getattr(pkginfo, k) return m def _project_pkginfo(dir_path): """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from pypi. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive project_dirname = os.listdir(dir_path)[0] pkginfo_path = os.path.join(dir_path, project_dirname, 'PKG-INFO') if not os.path.exists(pkginfo_path): return None pkginfo = UnpackedSDist(pkginfo_path) return _to_dict(pkginfo) class PyPIClient: """PyPI client in charge of discussing with the pypi server. Args: base_url (str): PyPI instance's base url temp_directory (str): Path to the temporary disk location used for uncompressing the release artifacts cache (bool): Use an internal cache to keep the archives on disk. Default is not to use it. cache_dir (str): cache's disk location (relevant only with `cache` to True) Those last 2 parameters are not for production use. """ def __init__(self, base_url='https://pypi.org/pypi', temp_directory=None, cache=False, cache_dir=None): self.version = __version__ self.base_url = base_url self.temp_directory = temp_directory self.do_cache = cache if self.do_cache: self.cache_dir = cache_dir self.cache_raw_dir = os.path.join(cache_dir, 'archives') os.makedirs(self.cache_raw_dir, exist_ok=True) self.session = requests.session() self.params = { 'headers': { 'User-Agent': 'Software Heritage PyPI Loader (%s)' % ( __version__ ) } } def _save_response(self, response, project=None): """Log the response from a server request to a cache dir. Args: response (Response): full server response cache_dir (str): system path for cache dir Returns: nothing """ import gzip from json import dumps datepath = arrow.utcnow().isoformat() name = '%s.gz' % datepath if project is None else '%s-%s.gz' % ( project, datepath) fname = os.path.join(self.cache_dir, name) with gzip.open(fname, 'w') as f: f.write(bytes( dumps(response.json()), 'utf-8' )) def _save_raw(self, filepath): """In cache mode, backup the filepath to self.cache_raw_dir Args: filepath (str): Path of the file to save """ _filename = os.path.basename(filepath) _archive = os.path.join(self.cache_raw_dir, _filename) shutil.copyfile(filepath, _archive) def _get_raw(self, filepath): """In cache mode, we try to retrieve the cached file. """ _filename = os.path.basename(filepath) _archive = os.path.join(self.cache_raw_dir, _filename) if not os.path.exists(_archive): return None shutil.copyfile(_archive, filepath) return filepath def _get(self, url, project=None): """Get query to the url. Args: url (str): Url Raises: ValueError in case of failing to query Returns: Response as dict if ok """ response = self.session.get(url, **self.params) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) if self.do_cache: self._save_response(response, project=project) return response.json() def info(self, project_url, project=None): """Given a metadata project url, retrieve the raw json response Args: project_url (str): Project's pypi to retrieve information Returns: Main project information as dict. """ return self._get(project_url, project=project) def release(self, project, release): """Given a project and a release name, retrieve the raw information for said project's release. Args: project (str): Project's name release (dict): Release information Returns: Release information as dict """ release_url = '%s/%s/%s/json' % (self.base_url, project, release) return self._get(release_url, project=project) - def prepare_release_artifact(self, project, artifact): + def prepare_release_artifacts(self, project, version, release_artifacts): + """For a given project's release version, fetch and prepare the + associated release artifacts. + + Args: + project (str): PyPI Project + version (str): Release version + release_artifacts ([dict]): List of source distribution + release artifacts + + Yields: + tuple (artifact, filepath, uncompressed_path, pkginfo) where: + + - artifact (dict): release artifact's associated info + - release (dict): release information + - filepath (str): Local artifact's path + - uncompressed_archive_path (str): uncompressed archive path + - pkginfo (dict): package information or None if none found + + """ + for artifact in release_artifacts: + release = { + 'name': version, + 'message': artifact.get('comment_text', ''), + } + artifact = { + 'sha256': artifact['digests']['sha256'], + 'size': artifact['size'], + 'filename': artifact['filename'], + 'url': artifact['url'], + 'date': artifact['upload_time'], + } + yield self.prepare_release_artifact(project, release, artifact) + + def prepare_release_artifact(self, project, release, artifact): """For a given release project, fetch and prepare the associated artifact. This: - fetches the artifact - checks the size, hashes match - uncompress the artifact locally - computes the swh hashes - returns the associated information for the artifact Args: project (str): Project's name + release (dict): Release information artifact (dict): Release artifact information Returns: - tuple (release, archive_path, uncompress_archive_path, pkginfo): + tuple (artifact, filepath, uncompressed_path, pkginfo) where: - release (dict): release information - archive_path (str): fetched archive - uncompressed_archive_path (str): uncompressed archive path - pkginfo (dict): package information or None if none found + - release (dict): Release information (name, message) + - artifact (dict): release artifact's information + - filepath (str): Local artifact's path + - uncompressed_archive_path (str): uncompressed archive path + - pkginfo (dict): package information or None if none found """ - version = artifact['name'] + version = release['name'] logging.debug('Release version: %s' % version) path = os.path.join(self.temp_directory, project, version) os.makedirs(path, exist_ok=True) filepath = os.path.join(path, artifact['filename']) logging.debug('Artifact local path: %s' % filepath) _filepath = None if self.do_cache: _filepath = self._get_raw(filepath) if not _filepath: # no cache hit, we fetch from pypi url = artifact['url'] r = self.session.get(url, **self.params) status = r.status_code if status != 200: if status == 404: raise ValueError("Project '%s' not found" % url) else: msg = "Fail to query '%s'\nCode: %s\nDetails: %s" % ( url, r.status_code, r.content) raise ValueError(msg) _len = len(r.content) if _len != artifact['size']: raise ValueError('Error when checking size: %s != %s' % ( artifact['size'], _len)) # checking digest and writing h = hashlib.sha256() with open(filepath, 'wb') as f: for chunk in r.iter_content(): h.update(chunk) f.write(chunk) actual_digest = h.hexdigest() if actual_digest != artifact['sha256']: raise ValueError( '%s %s: Checksum mismatched: %s != %s' % ( project, version, artifact['sha256'], actual_digest)) if self.do_cache: self._save_raw(filepath) uncompress_path = os.path.join(path, 'uncompress') os.makedirs(uncompress_path, exist_ok=True) nature = tarball.uncompress(filepath, uncompress_path) hashes = hashutil.hash_path(filepath) hashes.pop('length') # 'size' entry is already referenced artifact_hashes = convert_to_hex(hashes) artifact['archive_type'] = nature artifact.update(artifact_hashes) pkginfo = _project_pkginfo(uncompress_path) - return artifact, filepath, uncompress_path, pkginfo + return release, artifact, filepath, uncompress_path, pkginfo class PyPIProject: """PyPI project representation This allows to extract information for the: - project, either the latest information (from the last revision) - either the information for a given release (artifact) - Symmetrically for the release artifact author information This also fetches and uncompress the associated release artifacts. """ def __init__(self, client, project, project_metadata_url, data=None): self.client = client self.project = project self.project_metadata_url = project_metadata_url if data: self.data = data else: self.data = client.info(project_metadata_url, project) self.last_version = self.data['info']['version'] self.cache = { self.last_version: self.data } def _data(self, release_name=None): """Fetch data per release and cache it. Returns the cache retrieved data if already fetched. """ if release_name: data = self.cache.get(release_name) if not data: data = self.client.release(self.project, release_name) self.cache[release_name] = data else: data = self.data return data def info(self, release_name=None): """Compute release information for provided release (or latest one). """ return info(self._data(release_name)) def _filter_releases(self, version, release, known_releases): """Filter not already known sdist (source distribution) release. There can be multiple 'package_type' (sdist, bdist_egg, bdist_wheel, bdist_rpm, bdist_msi, bdist_wininst, ...), we are only interested in source distribution (sdist), others bdist* are binary Args: version (str): Release name or version release (dict/[dict]): Full release object (or list of) known_releases ([tuple]): List of known releases (tuple filename, sha256) """ if not release: return [] if not isinstance(release, list): release = [release] releases = [] for rel in release: name = rel['filename'] sha256 = rel['digests']['sha256'] if (name, sha256) in known_releases: logging.debug('artifact (%s, %s) already seen for release %s, skipping' % ( # noqa name, sha256, version)) continue if rel['packagetype'] != 'sdist': continue releases.append(rel) return releases def _cleanup_release_artifacts(self, archive_path, directory_path): """Clean intermediary files which no longer needs to be present. """ if directory_path and os.path.exists(directory_path): logging.debug('Clean up uncompressed archive path %s' % ( directory_path, )) shutil.rmtree(directory_path) if archive_path and os.path.exists(archive_path): logging.debug('Clean up archive %s' % archive_path) os.unlink(archive_path) - def _fetch_and_uncompress_releases(self, version, releases): - """Fetch an uncompress sdist releases - - Args: - version (str): Release name or version - releases ([dict]): List of source distribution release artifacts - - Yields: - tuple (release, filepath, uncompressed_path) - - """ - for release in releases: - # flatten the metadata to ease reading - flattened_release = { - 'name': version, - 'message': release.get('comment_text', ''), - 'sha256': release['digests']['sha256'], - 'size': release['size'], - 'filename': release['filename'], - 'url': release['url'], - 'date': release['upload_time'], - } - - # fetch and write locally archives - yield self.client.prepare_release_artifact( - self.project, flattened_release) - def releases(self, known_releases): """Fetch metadata/data per release (if new release artifact detected) For new release artifact, this: - downloads and uncompresses the release artifacts. - yields the (release info, author info, release, dir_path) - Clean up the intermediary fetched artifact files Args: known_releases (tuple): artifact name, artifact sha256 hash Yields: tuple (version, release_info, release, uncompressed_path) where: - - release_info (dict): release's associated version info + - project_info (dict): release's associated version info - author (dict): Author information for the release + - artifact (dict): Release artifact information - release (dict): release metadata - uncompressed_path (str): Path to uncompressed artifact """ releases_dict = self.data['releases'] for version, releases in releases_dict.items(): releases = self._filter_releases(version, releases, known_releases) if not releases: logging.warn('%s %s: No source artifact found, skipping' % ( self.project, version)) continue - _releases = self._fetch_and_uncompress_releases(version, releases) - for _release, _archive, _dir_path, _pkginfo in _releases: - _release_info = _pkginfo - if _release_info is None: # fallback to pypi api metadata + releases = self.client.prepare_release_artifacts( + self.project, version, releases) + for release, artifact, archive, dir_path, pkginfo in releases: + if pkginfo is None: # fallback to pypi api metadata msg = '%s %s: No PKG-INFO detected, skipping' % ( # noqa - self.project, _release['name']) + self.project, version) logging.warn(msg) continue - _author = author(_release_info) - yield _release_info, _author, _release, _dir_path - self._cleanup_release_artifacts(_archive, _dir_path) + yield pkginfo, author(pkginfo), release, artifact, dir_path + self._cleanup_release_artifacts(archive, dir_path) diff --git a/swh/loader/pypi/loader.py b/swh/loader/pypi/loader.py index ae6db15..a299aea 100644 --- a/swh/loader/pypi/loader.py +++ b/swh/loader/pypi/loader.py @@ -1,205 +1,206 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import os import shutil from tempfile import mkdtemp from swh.loader.core.utils import clean_dangling_folders from swh.loader.core.loader import SWHLoader from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes, normalize_timestamp ) from .client import PyPIClient, PyPIProject TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.pypi.' DEBUG_MODE = '** DEBUG MODE **' class PyPILoader(SWHLoader): CONFIG_BASE_FILENAME = 'loader/pypi' ADDITIONAL_CONFIG = { 'temp_directory': ('str', '/tmp/swh.loader.pypi/'), 'cache': ('bool', False), 'cache_dir': ('str', ''), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, client=None): super().__init__(logging_class='swh.loader.pypi.PyPILoader') self.origin_id = None if not client: temp_directory = self.config['temp_directory'] os.makedirs(temp_directory, exist_ok=True) self.temp_directory = mkdtemp( suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=temp_directory) self.pypi_client = PyPIClient( temp_directory=self.temp_directory, cache=self.config['cache'], cache_dir=self.config['cache_dir']) else: self.temp_directory = client.temp_directory self.pypi_client = client self.debug = self.config['debug'] def pre_cleanup(self): """To prevent disk explosion if some other workers exploded in mid-air (OOM killed), we try and clean up dangling files. """ if self.debug: self.log.warn('%s Will not pre-clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return clean_dangling_folders(self.config['temp_directory'], pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """Clean up temporary disk use """ if self.debug: self.log.warn('%s Will not clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, project_name, origin_url, origin_metadata_url=None): """Prepare the origin visit information Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.origin = { 'url': origin_url, 'type': 'pypi', } self.visit_date = None # loader core will populate it def prepare(self, project_name, origin_url, origin_metadata_url=None): """Keep reference to the origin url (project) and the project metadata url Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.project_name = project_name self.origin_url = origin_url self.origin_metadata_url = origin_metadata_url self.project = PyPIProject(self.pypi_client, self.project_name, self.origin_metadata_url) def _known_releases(self, _last_snapshot): """Retrieve the known releases/artifact for the origin_id. Returns tuple artifact's filename, artifact's sha256 """ _revs = [rev['target'] for rev in _last_snapshot['branches'].values()] _known_revisions = self.storage.revision_get(_revs) for _rev in _known_revisions: _artifact = _rev['metadata']['original_artifact'] yield _artifact['filename'], _artifact['sha256'] def _last_snapshot(self): """Retrieve the last snapshot """ return self.storage.snapshot_get_latest(self.origin_id) def fetch_data(self): """(override) Fetch and collect swh objects. """ last_snapshot = self._last_snapshot() if last_snapshot: self._snapshot = last_snapshot.copy() known_releases = self._known_releases(self._snapshot) else: self._snapshot = { 'branches': {} } known_releases = [] self._contents = [] self._directories = [] self._revisions = [] - for release_info, author, release, dirpath in self.project.releases( - known_releases): - dirpath = dirpath.encode('utf-8') - directory = Directory.from_disk(path=dirpath, data=True) + for project_info, author, release, artifact, dir_path in \ + self.project.releases(known_releases): + + dir_path = dir_path.encode('utf-8') + directory = Directory.from_disk(path=dir_path, data=True) _objects = directory.collect() self._contents.extend(_objects['content'].values()) self._directories.extend(_objects['directory'].values()) date = normalize_timestamp( - int(arrow.get(release['date']).timestamp)) + int(arrow.get(artifact['date']).timestamp)) name = release['name'].encode('utf-8') message = release['message'].encode('utf-8') if message: message = b'%s: %s' % (name, message) else: message = name _revision = { 'synthetic': True, 'metadata': { - 'original_artifact': release, - 'project': release_info, + 'original_artifact': artifact, + 'project': project_info, }, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'message': message, 'directory': directory.hash, 'parents': [], 'type': 'tar', } _revision['id'] = identifier_to_bytes( revision_identifier(_revision)) self._revisions.append(_revision) - branch_name = release['filename'].encode('utf-8') + branch_name = artifact['filename'].encode('utf-8') self._snapshot['branches'][branch_name] = { 'target': _revision['id'], 'target_type': 'revision', } self._snapshot['id'] = identifier_to_bytes( snapshot_identifier(self._snapshot)) def store_data(self): """(override) This sends collected objects to storage. """ self.maybe_load_contents(self._contents) self.maybe_load_directories(self._directories) self.maybe_load_revisions(self._revisions) self.maybe_load_snapshot(self._snapshot) diff --git a/swh/loader/pypi/tests/test_client.py b/swh/loader/pypi/tests/test_client.py index c299c62..cc2024a 100644 --- a/swh/loader/pypi/tests/test_client.py +++ b/swh/loader/pypi/tests/test_client.py @@ -1,69 +1,78 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from nose.tools import istest +from swh.loader.pypi import converters from swh.loader.pypi.client import _project_pkginfo -from swh.loader.pypi.converters import author from .common import WithProjectTest class PyPIProjectTest(WithProjectTest): @istest def releases(self): actual_releases = self.project.releases([]) - expected_releases = { + expected_release_artifacts = { '1.1.0': { 'archive_type': 'zip', 'blake2s256': 'df9413bde66e6133b10cadefad6fcf9cbbc369b47831089112c846d79f14985a', # noqa 'date': '2016-01-31T05:28:42', 'filename': '0805nexter-1.1.0.zip', - 'message': '', - 'name': '1.1.0', 'sha1': '127d8697db916ba1c67084052196a83319a25000', 'sha1_git': '4b8f1350e6d9fa00256e974ae24c09543d85b196', 'sha256': '52cd128ad3afe539478abc7440d4b043384295fbe6b0958a237cb6d926465035', # noqa 'size': 862, 'url': 'https://files.pythonhosted.org/packages/ec/65/c0116953c9a3f47de89e71964d6c7b0c783b01f29fa3390584dbf3046b4d/0805nexter-1.1.0.zip', # noqa }, '1.2.0': { 'archive_type': 'zip', 'blake2s256': '67010586b5b9a4aaa3b1c386f9dc8b4c99e6e40f37732a717a5f9b9b1185e588', # noqa 'date': '2016-01-31T05:51:25', 'filename': '0805nexter-1.2.0.zip', - 'message': '', - 'name': '1.2.0', 'sha1': 'd55238554b94da7c5bf4a349ece0fe3b2b19f79c', 'sha1_git': '8638d33a96cb25d8319af21417f00045ec6ee810', 'sha256': '49785c6ae39ea511b3c253d7621c0b1b6228be2f965aca8a491e6b84126d0709', # noqa 'size': 898, 'url': 'https://files.pythonhosted.org/packages/c4/a0/4562cda161dc4ecbbe9e2a11eb365400c0461845c5be70d73869786809c4/0805nexter-1.2.0.zip', # noqa } } + expected_releases = { + '1.1.0': { + 'name': '1.1.0', + 'message': '', + }, + '1.2.0': { + 'name': '1.2.0', + 'message': '', + }, + } + dir_paths = [] - for _release_info, _author, _release, _dir_path in actual_releases: - version = _release_info['version'] - expected_pkginfo = _project_pkginfo(_dir_path) - self.assertEquals(_release_info, expected_pkginfo) - expected_author = author(expected_pkginfo) - self.assertEqual(_author, expected_author) + for pkginfo, author, release, artifact, dir_path in actual_releases: + version = pkginfo['version'] + expected_pkginfo = _project_pkginfo(dir_path) + self.assertEquals(pkginfo, expected_pkginfo) + expected_author = converters.author(expected_pkginfo) + self.assertEqual(author, expected_author) + expected_artifact = expected_release_artifacts[version] + self.assertEqual(artifact, expected_artifact) expected_release = expected_releases[version] - self.assertEqual(_release, expected_release) + self.assertEqual(release, expected_release) - self.assertTrue(version in _dir_path) - self.assertTrue(self.project_name in _dir_path) + self.assertTrue(version in dir_path) + self.assertTrue(self.project_name in dir_path) # path still exists - self.assertTrue(os.path.exists(_dir_path)) - dir_paths.append(_dir_path) + self.assertTrue(os.path.exists(dir_path)) + dir_paths.append(dir_path) # Ensure uncompressed paths have been destroyed - for _dir_path in dir_paths: + for dir_path in dir_paths: # path no longer exists - self.assertFalse(os.path.exists(_dir_path)) + self.assertFalse(os.path.exists(dir_path))