diff --git a/swh/loader/pypi/loader.py b/swh/loader/pypi/loader.py index 0df276f..62fc969 100644 --- a/swh/loader/pypi/loader.py +++ b/swh/loader/pypi/loader.py @@ -1,178 +1,199 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import os import shutil from tempfile import mkdtemp from swh.loader.core.utils import clean_dangling_folders from swh.loader.core.loader import SWHLoader from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes, normalize_timestamp ) from .client import PyPiClient from .model import PyPiProject TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.pypi.' DEBUG_MODE = '** DEBUG MODE **' class PyPiLoader(SWHLoader): CONFIG_BASE_FILENAME = 'loader/pypi' ADDITIONAL_CONFIG = { 'temp_directory': ('str', '/tmp/swh.loader.pypi/'), 'cache': ('bool', False), 'cache_dir': ('str', ''), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self): super().__init__(logging_class='swh.loader.pypi.PyPiLoader') self.origin_id = None temp_directory = self.config['temp_directory'] os.makedirs(temp_directory, exist_ok=True) self.temp_directory = mkdtemp( suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=temp_directory) self.pypi_client = PyPiClient( temp_directory=self.temp_directory, cache=self.config['cache'], cache_dir=self.config['cache_dir']) self.debug = self.config['debug'] def pre_cleanup(self): """(override) To prevent disk explosion if some other workers exploded in mid-air (OOM killed), we try and clean up dangling files. """ if self.debug: self.log.warn('%s Will not pre-clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return clean_dangling_folders(self.config['temp_directory'], pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """(override) Clean up temporary disk use """ if self.debug: self.log.warn('%s Will not clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, project_name, origin_url, origin_metadata_url=None): """(override) Prepare the origin visit information Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.origin = { 'url': origin_url, 'type': 'pypi', } self.visit_date = None # loader core will populate it def prepare(self, project_name, origin_url, origin_metadata_url=None): """(override) Keep reference to the origin url (project) and the project metadata url Args: project_name (str): Project's simple name origin_url (str): Project's main url origin_metadata_url (str): Project's metadata url """ self.project_name = project_name self.origin_url = origin_url self.origin_metadata_url = origin_metadata_url self.project = PyPiProject(self.pypi_client, self.project_name, self.origin_metadata_url) + def _known_releases(self, _last_snapshot): + """Retrieve the known releases/artifact for the origin_id. + + Returns + tuple artifact's filename, artifact's sha256 + + """ + _revs = [rev['target'] for rev in _last_snapshot['branches'].values()] + _known_revisions = self.storage.revision_get(_revs) + for _rev in _known_revisions: + _artifact = _rev['metadata']['original_artifact'] + yield _artifact['filename'], _artifact['sha256'] + def fetch_data(self): """(override) Fetch and collect swh objects. """ - self._snapshot = { - 'branches': {} - } + _last_snapshot = self.storage.snapshot_get_latest(self.origin_id) + if _last_snapshot: + self._snapshot = _last_snapshot.copy() + _known_releases = self._known_releases(self._snapshot) + else: + self._snapshot = { + 'branches': {} + } + _known_releases = [] + self._contents = [] self._directories = [] self._revisions = [] self._releases = [] - for release_info, author, release, dirpath in self.project.releases(): + for release_info, author, release, dirpath in self.project.releases( + _known_releases): dirpath = dirpath.encode('utf-8') directory = Directory.from_disk(path=dirpath, data=True) _objects = directory.collect() self._contents.extend(_objects['content'].values()) self._directories.extend(_objects['directory'].values()) date = normalize_timestamp( int(arrow.get(release['date']).timestamp)) name = release['name'].encode('utf-8') message = release['message'].encode('utf-8') if message: message = b'%s: %s' % (name, message) else: message = name _revision = { 'synthetic': True, 'metadata': { 'original_artifact': release, 'project': release_info, }, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'message': message, 'directory': directory.hash, 'parents': [], 'type': 'tar', } _revision['id'] = identifier_to_bytes( revision_identifier(_revision)) self._revisions.append(_revision) branch_name = release['filename'].encode('utf-8') self._snapshot['branches'][branch_name] = { 'target': _revision['id'], 'target_type': 'revision', } self._snapshot['id'] = identifier_to_bytes( snapshot_identifier(self._snapshot)) def store_data(self): """(override) This sends collected objects to storage. """ self.maybe_load_contents(self._contents) self.maybe_load_directories(self._directories) self.maybe_load_revisions(self._revisions) self.maybe_load_releases(self._releases) self.maybe_load_snapshot(self._snapshot) diff --git a/swh/loader/pypi/model.py b/swh/loader/pypi/model.py index c90000b..65fa2c4 100644 --- a/swh/loader/pypi/model.py +++ b/swh/loader/pypi/model.py @@ -1,212 +1,226 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging import shutil def info(data): """Given a dict of data, returns a project subset. """ _info = data['info'] default = { 'home_page': _info['home_page'], 'description': _info['description'], 'summary': _info['summary'], 'license': _info['license'], 'package_url': _info['package_url'], 'project_url': _info['project_url'], 'upstream': None, } project_urls = _info.get('project_urls') if project_urls: homepage = project_urls.get('Homepage') if homepage: default['upstream'] = homepage return default def author(data): """Given a dict of data, returns an author subset. """ name = data['author'] email = data['author_email'] if email: fullname = '%s <%s>' % (name, email) else: fullname = name if not fullname: return {'fullname': b'', 'name': None, 'email': None} if fullname: fullname = fullname.encode('utf-8') if name: name = name.encode('utf-8') if email: email = email.encode('utf-8') return {'fullname': fullname, 'name': name, 'email': email} class PyPiProject: """PyPi project representation This permits to extract information for the: - project, either the latest information (from the last revision) - either the information for a given release - Symmetrically for the release author information This also fetches and uncompress the associated release artifacts. """ def __init__(self, client, project, project_metadata_url, data=None): self.client = client self.project = project self.project_metadata_url = project_metadata_url if data: self.data = data else: self.data = client.info(project_metadata_url, project) self.last_version = self.data['info']['version'] self.cache = { self.last_version: self.data } def _data(self, release_name=None): """Fetch data per release and cache it. Returns the cache retrieved data if already fetched. """ if release_name: data = self.cache.get(release_name) if not data: data = self.client.release(self.project, release_name) self.cache[release_name] = data else: data = self.data return data def info(self, release_name=None): """Compute release information for provided release (or latest one). """ return info(self._data(release_name)) def author(self, release_name=None): """Compute author information for provided release (or latest one). """ data = self._data(release_name) return author(data['info']) - def _filter_releases(self, version, release): - """Filter release to keep only sdist (source distribution) + def _filter_releases(self, version, release, known_releases): + """Filter not already known sdist (source distribution) release. There can be multiple 'package_type' (sdist, bdist_egg, bdist_wheel, bdist_rpm, bdist_msi, bdist_wininst, ...), we are only interested in source distribution (sdist), others bdist* are binary Args: version (str): Release name or version - release (dict): Full release object + release (dict/[dict]): Full release object (or list of) + known_releases ([tuple]): List of known releases (tuple filename, + sha256) """ if not release: return [] if not isinstance(release, list): release = [release] - # Filter only on 'sdist' package type - return [rel for rel in release if rel['packagetype'] == 'sdist'] + _releases = [] + for _rel in release: + _name = _rel['filename'] + _sha256 = _rel['digests']['sha256'] + if (_name, _sha256) in known_releases: + logging.debug('artifact (%s, %s) already seen for release %s, skipping' % ( # noqa + _name, _sha256, version)) + continue + if _rel['packagetype'] != 'sdist': + continue + _releases.append(_rel) + return _releases def _cleanup_release_artifacts(self, archive_path, directory_path): """Clean intermediary files which no longer needs to be present. """ if directory_path and os.path.exists(directory_path): logging.debug('Clean up uncompressed archive path %s' % ( directory_path, )) shutil.rmtree(directory_path) if archive_path and os.path.exists(archive_path): logging.debug('Clean up archive %s' % archive_path) os.unlink(archive_path) def _fetch_and_uncompress_releases(self, version, releases): """Fetch an uncompress sdist releases Args: version (str): Release name or version releases ([dict]): List of source distribution release artifacts Yields: tuple (release, filepath, uncompressed_path) """ for release in releases: # flatten the metadata to ease reading _flattenned_release = { 'name': version, 'message': release.get('comment_text', ''), 'sha256': release['digests']['sha256'], 'size': release['size'], 'filename': release['filename'], 'url': release['url'], 'date': release['upload_time'], } # fetch and write locally archives yield self.client.fetch_release_artifact( self.project, _flattenned_release) - def releases(self): - """Fetch metadata and data per release. + def releases(self, known_releases): + """Fetch metadata/data per release (if new release artifact detected) - This: + For new release artifact, this: - downloads and uncompresses the release artifacts. - - yields the (version, release) + - yields the (release info, author info, release, dir_path) - Clean up the intermediary fetched artifact files + Args: + known_releases (tuple): artifact name, artifact sha256 hash + Yields: tuple (version, release_info, release, uncompressed_path) where: - release_info (dict): release's associated version info - author (dict): Author information for the release - release (dict): release metadata - uncompressed_path (str): Path to uncompressed artifact """ - # The compute information per release releases_dict = self.data['releases'] for version, releases in releases_dict.items(): - releases = self._filter_releases(version, releases) + releases = self._filter_releases(version, releases, known_releases) if not releases: logging.warn('%s %s: No source artifact found, skipping' % ( self.project, version)) continue _releases = self._fetch_and_uncompress_releases(version, releases) for _release, _archive, _dir_path, _pkginfo in _releases: _release_info = _pkginfo if _release_info is None: # fallback to pypi api metadata msg = '%s %s: No PKG-INFO detected, fallback to pypi metadata' % ( # noqa self.project, _release['name']) logging.warn(msg) _release_info = self.info(release_name=version) _author = self.author(release_name=version) else: _author = author(_release_info) yield _release_info, _author, _release, _dir_path self._cleanup_release_artifacts(_archive, _dir_path)