diff --git a/swh/loader/pypi/client.py b/swh/loader/pypi/client.py index 36ac1ec..a605b85 100644 --- a/swh/loader/pypi/client.py +++ b/swh/loader/pypi/client.py @@ -1,162 +1,202 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import hashlib import logging import os import requests from swh.core import tarball from swh.model import hashutil try: from swh.loader.pypi._version import __version__ except ImportError: __version__ = 'devel' def convert_to_hex(d): """Convert a flat dictionary with bytes in values to the same dictionary with hex as values. Args: dict: flat dictionary with sha bytes in their values. Returns: Mirror dictionary with values as string hex. """ if not d: return d checksums = {} for key, h in d.items(): if isinstance(h, bytes): checksums[key] = hashutil.hash_to_hex(h) else: checksums[key] = h return checksums class PyPiClient: """PyPi client in charge of discussing with the pypi server. """ def __init__(self, temp_directory=None, cache=False, cache_dir=None): self.version = __version__ if not temp_directory: from tempfile import mkdtemp self.temp_directory = mkdtemp(dir=temp_directory, prefix='swh.loader.pypi.client') else: self.temp_directory = temp_directory self.do_cache = cache if self.do_cache: self.cache_dir = cache_dir os.makedirs(self.cache_dir, exist_ok=True) self.session = requests.session() self.params = { 'headers': { 'User-Agent': 'Software Heritage PyPi Loader (%s)' % ( __version__ ) } } def _save_response(self, response): """Log the response from a server request to a cache dir. Args: - response: full server response - cache_dir: system path for cache dir + response (Response): full server response + cache_dir (str): system path for cache dir + Returns: nothing + """ import gzip from json import dumps datepath = arrow.utcnow().isoformat() fname = os.path.join(self.cache_dir, datepath + '.gz') with gzip.open(fname, 'w') as f: f.write(bytes( dumps(response.json()), 'utf-8' )) def _get(self, url): """Get query to the url. + Args: + url (str): Url + + Raises: + ValueError in case of failing to query + + Returns: + Response as dict if ok + """ response = self.session.get(url, **self.params) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) if self.do_cache: self._save_response(response) return response.json() def info(self, project_url): """Given a metadata project url, retrieve the raw json response + Args: + project_url (str): Project's pypi to retrieve information + + Returns: + Main project information as dict. + """ return self._get(project_url) def release(self, project, release): - """Given a project and a release name, retrieve the raw json response + """Given a project and a release name, retrieve the raw information + for said project's release. + + Args: + project (str): Project's name + release (dict): Release information + + Returns: + Release information as dict """ release_url = 'https://pypi.org/pypi/%s/%s/json' % (project, release) return self._get(release_url) def fetch_release(self, project, release): + """Fetch for a given release project the associated artifact. + + This: + - fetches the artifact + - checks the size, hashes match + - uncompress the artifact locally + - computes the swh hashes + - returns the associated information for the artifact + + Args: + project (str): Project's name + release (dict): Release information + + Returns: + Release information (dict) updated with the artifact information + + """ version = release['name'] logging.debug('Release version: %s' % version) path = os.path.join(self.temp_directory, project, version) os.makedirs(path, exist_ok=True) filepath = os.path.join(path, release['filename']) logging.debug('Release local path: %s' % filepath) url = release['url'] r = self.session.get(url, **self.params) if r.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, r.status_code)) - - # checks _len = len(r.content) if _len != release['size']: raise ValueError('Error when checking size: %s != %s' % ( release['size'], _len)) # checking digest and writing h = hashlib.sha256() with open(filepath, 'wb') as f: for chunk in r.iter_content(): h.update(chunk) f.write(chunk) actual_digest = h.hexdigest() if actual_digest != release['sha256']: raise ValueError( 'Error when checking the hash checksum: %s != %s' % ( release['sha256'], actual_digest)) uncompress_path = os.path.join(path, 'uncompress') os.makedirs(uncompress_path, exist_ok=True) nature = tarball.uncompress(filepath, uncompress_path) release['directory'] = uncompress_path artifact = convert_to_hex(hashutil.hash_path(filepath)) artifact['archive_type'] = nature for key, value in artifact.items(): release[key] = value return release diff --git a/swh/loader/pypi/loader.py b/swh/loader/pypi/loader.py index 7055584..2ee8e06 100644 --- a/swh/loader/pypi/loader.py +++ b/swh/loader/pypi/loader.py @@ -1,172 +1,183 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import logging import os import shutil from swh.loader.core.utils import clean_dangling_folders from swh.loader.core.loader import SWHLoader from swh.model.from_disk import Directory from swh.model.identifiers import ( release_identifier, revision_identifier, snapshot_identifier, identifier_to_bytes, normalize_timestamp ) from .client import PyPiClient from .model import PyPiProject TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.pypi.' class PyPiLoader(SWHLoader): CONFIG_BASE_FILENAME = 'loader/pypi' ADDITIONAL_CONFIG = { 'temp_directory': ('str', '/tmp/swh.loader.pypi/'), 'cache': ('bool', False), 'cache_dir': ('str', ''), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self): super().__init__(logging_class='swh.loader.pypi.PyPiLoader') self.origin_id = None self.temp_directory = self.config['temp_directory'] self.pypi_client = PyPiClient( temp_directory=self.temp_directory, cache=self.config['cache'], cache_dir=self.config['cache_dir']) self.debug = self.config['debug'] def pre_cleanup(self): - """(override) To prevent disk explosion... + """(override) To prevent disk explosion if some other workers exploded + in mid-air (OOM killed), we try and clean up dangling files. """ clean_dangling_folders(self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """(override) Clean up temporary disk use """ if self.debug: self.log.warn('** DEBUG MODE ** Will not clean up temp dir %s' % ( self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, project_name, origin_url, origin_metadata_url=None): """(override) Prepare the origin visit information + Args: + project_name (str): Project's simple name + origin_url (str): Project's main url + origin_metadata_url (str): Project's metadata url + """ self.origin = { 'url': origin_url, 'type': 'pypi', } - self.visit_date = None + self.visit_date = None # loader core will populate it def prepare(self, project_name, origin_url, origin_metadata_url=None): """(override) Keep reference to the origin url (project) and the project metadata url + Args: + project_name (str): Project's simple name + origin_url (str): Project's main url + origin_metadata_url (str): Project's metadata url + """ self.project_name = project_name self.origin_url = origin_url self.origin_metadata_url = origin_metadata_url self.project = PyPiProject(self.pypi_client, self.project_name, self.origin_metadata_url) def fetch_data(self): """(override) This will fetch and prepare the needed releases. """ self.pypi_releases = self.project.releases() def store_data(self): """(override) This collects the necessary objects information and send them to storage. """ _snapshot = { 'branches': {} } _contents = [] _directories = [] _revisions = [] _releases = [] for version, _release in self.pypi_releases: info = self.project.info(version) author = self.project.author(version) logging.debug('author: %s' % author) release = _release['release'] _dir_path = release.pop('directory') _dir_path = _dir_path.encode('utf-8') directory = Directory.from_disk(path=_dir_path, data=True) _objects = directory.collect() _contents.extend(_objects['content'].values()) _directories.extend(_objects['directory'].values()) date = normalize_timestamp( int(arrow.get(release['date']).timestamp)) name = release['name'].encode('utf-8') message = release['message'].encode('utf-8') _revision = { 'synthetic': True, 'metadata': { 'original_artifact': [release], 'project': info, }, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'name': name, 'message': message, 'directory': directory.hash, 'parents': [], 'type': 'tar', } _revision['id'] = identifier_to_bytes( revision_identifier(_revision)) _revisions.append(_revision) _release = { 'name': name, 'author': author, 'date': date, 'message': message, 'target_type': 'revision', 'target': _revision['id'], 'synthetic': False, } _release['id'] = identifier_to_bytes( release_identifier(_release)) _releases.append(_release) _snapshot['branches'][name] = { 'target': _release['id'], 'target_type': 'release', } _snapshot['id'] = identifier_to_bytes( snapshot_identifier(_snapshot)) self.maybe_load_contents(_contents) self.maybe_load_directories(_directories) self.maybe_load_revisions(_revisions) self.maybe_load_releases(_releases) self.maybe_load_snapshot(_snapshot) diff --git a/swh/loader/pypi/model.py b/swh/loader/pypi/model.py index 4922635..024ca45 100644 --- a/swh/loader/pypi/model.py +++ b/swh/loader/pypi/model.py @@ -1,127 +1,148 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging def info(data): """Given a dict of data, returns a project subset. """ info = data['info'] default = { 'home_page': info['home_page'], 'description': info['description'], 'summary': info['summary'], 'license': info['license'], 'package_url': info['package_url'], 'project_url': info['project_url'], 'upstream': None, } project_urls = info.get('project_urls') if project_urls: homepage = project_urls.get('Homepage') if homepage: default['upstream'] = homepage return default def author(data): """Given a dict of data, returns an author subset. """ name = data['info']['author'] email = data['info']['author_email'] if email: fullname = '%s <%s>' % (name, email) else: fullname = name return { 'fullname': fullname.encode('utf-8'), 'name': name.encode('utf-8'), 'email': email.encode('utf-8'), } class PyPiProject: """PyPi project representation This permits to extract information for the: - project, either the latest information (from the last revision) - - project information for a given release - - same for author information + - either the information for a given release + - Symmetrically for the release author information + + This also fetches and uncompress the associated release artifacts. """ def __init__(self, client, project, project_metadata_url, data=None): self.client = client self.project = project self.project_metadata_url = project_metadata_url if data: self.data = data else: self.data = client.info(project_metadata_url) self.last_version = self.data['info']['version'] self.cache = { self.last_version: self.data } def _data(self, release_name=None): + """Fetch data per release and cache it. Returns the cache retrieved + data if already fetched. + + """ if release_name: data = self.cache.get(release_name) if not data: data = self.client.release(self.project, release_name) self.cache[release_name] = data else: data = self.data return data def info(self, release_name=None): + """Compute release information for release provided or the latest one. + + """ return info(self._data(release_name)) def author(self, release_name=None): + """Compute author for the provided release if provided (use the latest + release otherwise). + + """ return author(self._data(release_name)) def releases(self): + """Fetch metadata and data per release. + + This downloads and uncompresses the release artifacts. + + Yields: + tuple (version, release) + + """ # The compute information per release releases_dict = self.data['releases'] for version in releases_dict: release = releases_dict[version] if version == self.last_version: # avoid an extra query release_info = self.info() else: release_info = self.info(release_name=version) # FIXME: there can be multiple 'package_type' here: # sdist, bdist_egg, bdist_wheel if isinstance(release, list): if not release: continue if len(release) > 1: raise ValueError( 'Unsupported other formats for now, failing!') release = release[0] # flatten the metadata to ease reading _flattenned_release = { 'name': version, 'message': release['comment_text'], 'sha256': release['digests']['sha256'], 'size': release['size'], 'filename': release['filename'], 'url': release['url'], 'date': release['upload_time'], } # fetch and write locally archives _release = self.client.fetch_release( self.project, _flattenned_release) yield version, { 'info': release_info, 'release': _release, }