diff --git a/swh/loader/npm/client.py b/swh/loader/npm/client.py index 8e07da5..a4f51ca 100644 --- a/swh/loader/npm/client.py +++ b/swh/loader/npm/client.py @@ -1,219 +1,219 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import os import chardet import requests from swh.core import tarball from swh.model import hashutil from swh.loader.npm.utils import extract_npm_package_author class NpmClient: """ Helper class internally used by the npm loader to fetch metadata for a specific package hosted on the npm registry. Args: temp_dir (str): Path to the temporary disk location used to uncompress the package tarballs """ def __init__(self, temp_dir, log=None): self.root_temp_dir = temp_dir self.session = requests.session() self.params = { 'headers': { 'User-Agent': 'Software Heritage npm loader' } } self.log = log or logging def fetch_package_metadata(self, package_metadata_url): """ Fetch metadata for a given package and make it the focused one. This must be called prior any other operations performed by the other methods below. Args: package_metadata_url: the package metadata url provided by the npm loader """ self.package_metadata_url = package_metadata_url self.package_metadata = self._request(self.package_metadata_url).json() self.package = self.package_metadata['name'] self.temp_dir = os.path.join(self.root_temp_dir, self.package) def latest_package_version(self): """ Return the last released version of the focused package. Returns: str: the last releases package version """ latest = '' if 'latest' in self.package_metadata['dist-tags']: latest = self.package_metadata['dist-tags']['latest'] return latest def package_versions(self, known_versions=None): """ Return the available versions for the focused package. Args: known_versions (dict): may be provided by the loader, it enables to filter out versions already ingested in the archive. Returns: dict: A dict whose keys are Tuple[version, tarball_sha1] and values dicts with the following entries: * **name**: the package name * **version**: the package version * **filename**: the package source tarball filename * **sha1**: the package source tarball sha1 checksum * **date**: the package release date * **url**: the package source tarball download url """ versions = {} if 'versions' in self.package_metadata: for version, data in self.package_metadata['versions'].items(): sha1 = data['dist']['shasum'] key = (version, sha1) if known_versions and key in known_versions: continue tarball_url = data['dist']['tarball'] filename = os.path.basename(tarball_url) date = self.package_metadata['time'][version] versions[key] = { 'name': self.package, 'version': version, 'filename': filename, 'sha1': sha1, 'date': date, 'url': tarball_url } return versions def prepare_package_versions(self, known_versions=None): """ Instantiate a generator that will process a specific package released version at each iteration step. The following operations will be performed: 1. Create a temporary directory to download and extract the release tarball 2. Download the tarball 3. Check downloaded tarball integrity 4. Uncompress the tarball 5. Parse ``package.json`` file associated to the package version 6. Extract author from the parsed ``package.json`` file Args: known_versions (dict): may be provided by the loader, it enables to filter out versions already ingested in the archive. Yields: Tuple[dict, dict, dict, str]: tuples containing the following members: * a dict holding the parsed ``package.json`` file * a dict holding package author information * a dict holding package tarball information * a string holding the path of the uncompressed package to load into the archive """ new_versions = self.package_versions(known_versions) for version, package_source_data in sorted(new_versions.items()): # filter out version with missing tarball (cases exist), # package visit will be marked as partial at the end of # the loading process tarball_url = package_source_data['url'] tarball_request = self._request(tarball_url, throw_error=False) if tarball_request.status_code == 404: - self.log.debug('Tarball url %s returns a 404 error.' % + self.log.debug('Tarball url %s returns a 404 error.', tarball_url) self.log.debug(('Version %s of %s package will be missing and ' - 'the visit will be marked as partial.') % - (version[0], self.package)) + 'the visit will be marked as partial.'), + version[0], self.package) continue version_data = self.package_metadata['versions'][version[0]] yield self._prepare_package_version(package_source_data, version_data) def _prepare_package_version(self, package_source_data, version_data): version = version_data['version'] - self.log.debug('Processing version %s for npm package %s' % - (version, self.package)) + self.log.debug('Processing version %s for npm package %s', + version, self.package) # create temp dir to download and extract package tarball path = os.path.join(self.temp_dir, version) os.makedirs(path, exist_ok=True) filepath = os.path.join(path, package_source_data['filename']) # download tarball url = package_source_data['url'] response = self._request(url) hash_names = hashutil.DEFAULT_ALGORITHMS - {'sha1_git'} h = hashutil.MultiHash(hash_names=hash_names) with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=None): h.update(chunk) f.write(chunk) # check tarball integrity hashes = h.hexdigest() expected_digest = package_source_data['sha1'] actual_digest = hashes['sha1'] if actual_digest != expected_digest: raise ValueError( '%s %s: Checksum mismatched: %s != %s' % ( self.package, version, expected_digest, actual_digest)) # uncompress tarball tarball.uncompress(filepath, path) # remove tarball os.remove(filepath) # do not archive useless tarball root directory package_path = os.path.join(path, 'package') # some old packages use their name as root directory if not os.path.exists(package_path): ver_pos = package_source_data['filename'].rfind(version) package_name = package_source_data['filename'][:ver_pos-1] package_path = os.path.join(path, package_name) # fallback: archive root tarball directory if not os.path.exists(package_path): package_path = path - self.log.debug('Package local path: %s' % package_path) + self.log.debug('Package local path: %s', package_path) package_source_data.update(hashes) # parse package.json file to add its content to revision metadata package_json_path = os.path.join(package_path, 'package.json') package_json = {} with open(package_json_path, 'rb') as package_json_file: package_json_bytes = package_json_file.read() file_encoding = chardet.detect(package_json_bytes)['encoding'] package_json = json.loads(package_json_bytes.decode(file_encoding)) # extract author from package.json author = extract_npm_package_author(package_json) return (package_json, author, package_source_data, package_path) def _request(self, url, throw_error=True): response = self.session.get(url, **self.params, stream=True) if response.status_code != 200 and throw_error: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) return response diff --git a/swh/loader/npm/loader.py b/swh/loader/npm/loader.py index ab52d10..8883f4e 100644 --- a/swh/loader/npm/loader.py +++ b/swh/loader/npm/loader.py @@ -1,318 +1,318 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil from tempfile import mkdtemp from urllib.parse import quote from dateutil import parser as date_parser from swh.loader.core.utils import clean_dangling_folders from swh.loader.core.loader import BufferedLoader from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes, normalize_timestamp ) from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.loader.npm.client import NpmClient TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.npm.' class NpmLoader(BufferedLoader): """ Loader for ingesting source packages from the npm registry into the Software Heritage archive. """ CONFIG_BASE_FILENAME = 'loader/npm' ADDITIONAL_CONFIG = { 'temp_directory': ('str', '/tmp/swh.loader.npm/'), 'debug': ('bool', False) } def __init__(self): super().__init__(logging_class='swh.loader.npm.NpmLoader') self.origin_id = None temp_directory = self.config['temp_directory'] os.makedirs(temp_directory, exist_ok=True) self.temp_directory = mkdtemp(suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=temp_directory) self.debug = self.config['debug'] self.done = False self.npm_client = NpmClient(self.temp_directory, self.log) def pre_cleanup(self): """ To prevent disk explosion if some other workers exploded in mid-air (OOM killed), we try and clean up dangling files. """ if self.debug: - self.log.warning('DEBUG: will not pre-clean up temp dir %s' % + self.log.warning('DEBUG: will not pre-clean up temp dir %s', self.temp_directory) return clean_dangling_folders(self.config['temp_directory'], pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """ Clean up temporary disk use after downloading and extracting npm source package tarballs. """ if self.debug: - self.log.warning('DEBUG: will not clean up temp dir %s' % + self.log.warning('DEBUG: will not clean up temp dir %s', self.temp_directory) return if os.path.exists(self.temp_directory): - self.log.debug('Clean up %s' % self.temp_directory) + self.log.debug('Clean up %s', self.temp_directory) shutil.rmtree(self.temp_directory) def load(self, package_name, package_url=None, package_metadata_url=None): """ Loader entrypoint to ingest source tarballs for a npm package. Args: package_name (str): the name of the npm package package_url (str): the url of the package description, if not provided the following one will be used: https://www.npmjs.com/package/ package_metadata_url (str): the url for the package JSON metadata, if not provided the following one will be used: https://replicate.npmjs.com// """ if package_url is None: package_url = 'https://www.npmjs.com/package/%s' % package_name if package_metadata_url is None: package_metadata_url = 'https://replicate.npmjs.com/%s/' %\ quote(package_name, safe='') return super().load(package_name, package_url, package_metadata_url) def prepare_origin_visit(self, package_name, package_url, package_metadata_url): """ Prepare npm package visit. Args: package_name (str): the name of the npm package package_url (str): the url of the package description package_metadata_url (str): the url for the package JSON metadata """ # reset statuses self._load_status = 'uneventful' self._visit_status = 'full' self.done = False # fetch the npm package metadata from the registry self.npm_client.fetch_package_metadata(package_metadata_url) self.origin = { 'url': package_url, 'type': 'npm', } self.visit_date = None # loader core will populate it def _known_versions(self, last_snapshot): """ Retrieve the known release versions for the npm package (i.e. those already ingested into the archive). Args last_snapshot (dict): Last snapshot for the visit Returns: dict: Dict whose keys are Tuple[filename, sha1] and values are revision ids. """ if not last_snapshot or 'branches' not in last_snapshot: return {} revs = [rev['target'] for rev in last_snapshot['branches'].values() if rev and rev['target_type'] == 'revision'] known_revisions = self.storage.revision_get(revs) ret = {} for revision in known_revisions: if not revision: continue if 'package_source' in revision['metadata']: package = revision['metadata']['package_source'] ret[(package['version'], package['sha1'])] = revision['id'] return ret def _last_snapshot(self): """ Retrieve the last snapshot of the npm package if any. """ snapshot = self.storage.snapshot_get_latest(self.origin_id) if snapshot and snapshot.pop('next_branch', None): snapshot = snapshot_get_all_branches(self.storage, snapshot['id']) return snapshot def prepare(self, package_name, package_url, package_metadata_url): """ Prepare effective loading of source tarballs for a npm package. Args: package_name (str): the name of the npm package package_url (str): the url of the package description package_metadata_url (str): the url for the package JSON metadata """ self.package_name = package_name self.origin_url = package_url self.package_contents = [] self.package_directories = [] self.package_revisions = [] self.package_load_status = 'uneventful' self.package_visit_status = 'full' last_snapshot = self._last_snapshot() self.known_versions = self._known_versions(last_snapshot) self.new_versions = \ self.npm_client.prepare_package_versions(self.known_versions) def fetch_data(self): """ Called once per package release version to process. This will for each call: - download a tarball associated to a package release version - uncompress it and compute the necessary information - compute the swh objects Returns: True as long as data to fetch exist """ data = None if self.done: return False try: data = next(self.new_versions) self.package_load_status = 'eventful' except StopIteration: self.done = True return False package_metadata, author, package_source_data, dir_path = data dir_path = dir_path.encode('utf-8') directory = Directory.from_disk(path=dir_path, data=True) objects = directory.collect() self.package_contents = objects['content'].values() self.package_directories = objects['directory'].values() date = date_parser.parse(package_source_data['date']) date = normalize_timestamp(int(date.timestamp())) message = package_source_data['version'].encode('ascii') revision = { 'synthetic': True, 'metadata': { 'package_source': package_source_data, 'package': package_metadata, }, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'message': message, 'directory': directory.hash, 'parents': [], 'type': 'tar', } revision['id'] = identifier_to_bytes(revision_identifier(revision)) self.package_revisions.append(revision) package_key = (package_source_data['version'], package_source_data['sha1']) self.known_versions[package_key] = revision['id'] - self.log.debug('Removing unpacked package files at %s' % dir_path) + self.log.debug('Removing unpacked package files at %s', dir_path) shutil.rmtree(dir_path) return not self.done def _target_from_version(self, version, sha1): """ Return revision information if any for a given package version. """ target = self.known_versions.get((version, sha1)) return { 'target': target, 'target_type': 'revision', } if target else None def _generate_and_load_snapshot(self): """ Generate snapshot for the npm package visit. """ branches = {} latest_version = self.npm_client.latest_package_version() for version_data in self.npm_client.package_versions().values(): version = version_data['version'] sha1 = version_data['sha1'] branch_name = ('releases/%s' % version).encode('ascii') target = self._target_from_version(version, sha1) branches[branch_name] = target if version == latest_version: branches[b'HEAD'] = { 'target_type': 'alias', 'target': branch_name, } if not target: self.package_visit_status = 'partial' snapshot = { 'branches': branches, } snapshot['id'] = identifier_to_bytes(snapshot_identifier(snapshot)) self.maybe_load_snapshot(snapshot) def store_data(self): """ Send collected objects to storage. """ self.maybe_load_contents(self.package_contents) self.maybe_load_directories(self.package_directories) self.maybe_load_revisions(self.package_revisions) if self.done: self._generate_and_load_snapshot() self.flush() def load_status(self): return { 'status': self.package_load_status, } def visit_status(self): return self.package_visit_status if __name__ == '__main__': import logging import sys logging.basicConfig(level=logging.DEBUG) if len(sys.argv) != 2: logging.error('Usage: %s ' % sys.argv[0]) sys.exit(1) package_name = sys.argv[1] loader = NpmLoader() loader.load(package_name)