diff --git a/swh/loader/package/archive/loader.py b/swh/loader/package/archive/loader.py index 4d32228..e02467f 100644 --- a/swh/loader/package/archive/loader.py +++ b/swh/loader/package/archive/loader.py @@ -1,136 +1,139 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import iso8601 import logging from os import path from typing import Any, Dict, Generator, Mapping, Optional, Sequence, Tuple from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import release_name from swh.model.identifiers import normalize_timestamp logger = logging.getLogger(__name__) SWH_PERSON = { 'name': b'Software Heritage', 'fullname': b'Software Heritage', 'email': b'robot@softwareheritage.org' } REVISION_MESSAGE = b'swh-loader-package: synthetic revision message' class ArchiveLoader(PackageLoader): + """Load archive origin's artifact files into swh archive + + """ visit_type = 'tar' def __init__(self, url: str, artifacts: Sequence[Mapping[str, Any]], identity_artifact_keys: Optional[Sequence[str]] = None): """Loader constructor. For now, this is the lister's task output. Args: url: Origin url artifacts: List of artifact information with keys: **time**: last modification time as either isoformat date string or timestamp **url**: the artifact url to retrieve filename **artifact's filename version**: artifact's version length **length**: artifact's length identity_artifact_keys: Optional List of keys forming the "identity" of an artifact """ super().__init__(url=url) self.artifacts = artifacts # assume order is enforced in the lister if not identity_artifact_keys: # default keys for gnu identity_artifact_keys = ['time', 'url', 'length', 'version'] self.identity_artifact_keys = identity_artifact_keys def get_versions(self) -> Sequence[str]: versions = [] for archive in self.artifacts: v = archive.get('version') if v: versions.append(v) return versions def get_default_version(self) -> str: # It's the most recent, so for this loader, it's the last one return self.artifacts[-1]['version'] def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: for a_metadata in self.artifacts: url = a_metadata['url'] package_version = a_metadata['version'] if version == package_version: filename = a_metadata.get('filename') p_info = { 'url': url, 'filename': filename if filename else path.split(url)[-1], 'raw': a_metadata, } # FIXME: this code assumes we have only 1 artifact per # versioned package yield release_name(version), p_info def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: identity = artifact_identity( artifact_metadata, id_keys=self.identity_artifact_keys) for rev_id, known_artifact in known_artifacts.items(): logging.debug('known_artifact: %s', known_artifact) reference_artifact = known_artifact['extrinsic']['raw'] known_identity = artifact_identity( reference_artifact, id_keys=self.identity_artifact_keys) if identity == known_identity: return rev_id return None def build_revision(self, a_metadata: Mapping[str, Any], uncompressed_path: str) -> Dict: time = a_metadata['time'] # assume it's a timestamp if isinstance(time, str): # otherwise, assume it's a parsable date time = iso8601.parse_date(time) normalized_time = normalize_timestamp(time) return { 'type': 'tar', 'message': REVISION_MESSAGE, 'date': normalized_time, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'committer_date': normalized_time, 'parents': [], 'metadata': { 'intrinsic': {}, 'extrinsic': { 'provider': self.url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, }, } def artifact_identity(d: Mapping[str, Any], id_keys: Sequence[str]) -> Sequence[Any]: """Compute the primary key for a dict using the id_keys as primary key composite. Args: d: A dict entry to compute the primary key on id_keys: Sequence of keys to use as primary key Returns: The identity for that dict entry """ return [d.get(k) for k in id_keys] diff --git a/swh/loader/package/npm/loader.py b/swh/loader/package/npm/loader.py index e9ed096..2569946 100644 --- a/swh/loader/package/npm/loader.py +++ b/swh/loader/package/npm/loader.py @@ -1,341 +1,344 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import os import re from codecs import BOM_UTF8 from typing import Any, Dict, Generator, Mapping, Sequence, Tuple, Optional import chardet import iso8601 from urllib.parse import quote from swh.model.identifiers import normalize_timestamp from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import api_info, release_name logger = logging.getLogger(__name__) _EMPTY_AUTHOR = {'fullname': b'', 'name': None, 'email': None} # https://github.com/jonschlinkert/author-regex _author_regexp = r'([^<(]+?)?[ \t]*(?:<([^>(]+?)>)?[ \t]*(?:\(([^)]+?)\)|$)' class NpmLoader(PackageLoader): + """Load npm origin's artifact releases into swh archive. + + """ visit_type = 'npm' def __init__(self, url: str): """Constructor Args str: origin url (e.g. https://www.npmjs.com/package/) """ super().__init__(url=url) package_name = url.split('https://www.npmjs.com/package/')[1] safe_name = quote(package_name, safe='') self.provider_url = f'https://replicate.npmjs.com/{safe_name}/' self._info: Dict[str, Any] = {} self._versions = None @property def info(self) -> Dict[str, Any]: """Return the project metadata information (fetched from npm registry) """ if not self._info: self._info = api_info(self.provider_url) return self._info def get_versions(self) -> Sequence[str]: return sorted(list(self.info['versions'].keys())) def get_default_version(self) -> str: return self.info['dist-tags'].get('latest', '') def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: meta = self.info['versions'][version] url = meta['dist']['tarball'] p_info = { 'url': url, 'filename': os.path.basename(url), 'raw': meta, } yield release_name(version), p_info def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: return artifact_to_revision_id(known_artifacts, artifact_metadata) def build_revision( self, a_metadata: Dict, uncompressed_path: str) -> Dict: i_metadata = extract_intrinsic_metadata(uncompressed_path) # from intrinsic metadata author = extract_npm_package_author(i_metadata) message = i_metadata['version'].encode('ascii') # from extrinsic metadata # No date available in intrinsic metadata: retrieve it from the API # metadata, using the version number that the API claims this package # has. extrinsic_version = a_metadata['version'] date = self.info['time'][extrinsic_version] date = iso8601.parse_date(date) date = normalize_timestamp(int(date.timestamp())) return { 'type': 'tar', 'message': message, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'parents': [], 'metadata': { 'intrinsic': { 'tool': 'package.json', 'raw': i_metadata, }, 'extrinsic': { 'provider': self.provider_url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, }, } def artifact_to_revision_id( known_artifacts: Dict, artifact_metadata: Dict) -> Optional[bytes]: """Given metadata artifact, solves the associated revision id. The following code allows to deal with 2 metadata formats: - old format sample: { 'package_source': { 'sha1': '05181c12cd8c22035dd31155656826b85745da37', } } - new format sample: { 'original_artifact': [{ 'checksums': { 'sha256': "6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec", # noqa ... }, }], ... } """ shasum = artifact_metadata['dist']['shasum'] for rev_id, known_artifact in known_artifacts.items(): known_original_artifact = known_artifact.get('original_artifact') if not known_original_artifact: # previous loader-npm version kept original artifact elsewhere known_original_artifact = known_artifact.get('package_source') if not known_original_artifact: continue original_hash = known_original_artifact['sha1'] else: assert isinstance(known_original_artifact, list) original_hash = known_original_artifact[0]['checksums']['sha1'] if shasum == original_hash: return rev_id return None def parse_npm_package_author(author_str): """ Parse npm package author string. It works with a flexible range of formats, as detailed below:: name name (url) name (url) name (url) name(url) name (url) name (url) name(url) name(url) name (url) name(url) name name (url) (url) (url) (url) (url) Args: author_str (str): input author string Returns: dict: A dict that may contain the following keys: * name * email * url """ author = {} matches = re.findall(_author_regexp, author_str.replace('<>', '').replace('()', ''), re.M) for match in matches: if match[0].strip(): author['name'] = match[0].strip() if match[1].strip(): author['email'] = match[1].strip() if match[2].strip(): author['url'] = match[2].strip() return author def extract_npm_package_author(package_json): """ Extract package author from a ``package.json`` file content and return it in swh format. Args: package_json (dict): Dict holding the content of parsed ``package.json`` file Returns: dict: A dict with the following keys: * fullname * name * email """ def _author_str(author_data): if type(author_data) is dict: author_str = '' if 'name' in author_data: author_str += author_data['name'] if 'email' in author_data: author_str += ' <%s>' % author_data['email'] return author_str elif type(author_data) is list: return _author_str(author_data[0]) if len(author_data) > 0 else '' else: return author_data author_data = {} for author_key in ('author', 'authors'): if author_key in package_json: author_str = _author_str(package_json[author_key]) author_data = parse_npm_package_author(author_str) name = author_data.get('name') email = author_data.get('email') fullname = None if name and email: fullname = '%s <%s>' % (name, email) elif name: fullname = name if not fullname: return _EMPTY_AUTHOR if fullname: fullname = fullname.encode('utf-8') if name: name = name.encode('utf-8') if email: email = email.encode('utf-8') return {'fullname': fullname, 'name': name, 'email': email} def _lstrip_bom(s, bom=BOM_UTF8): if s.startswith(bom): return s[len(bom):] else: return s def load_json(json_bytes): """ Try to load JSON from bytes and return a dictionary. First try to decode from utf-8. If the decoding failed, try to detect the encoding and decode again with replace error handling. If JSON is malformed, an empty dictionary will be returned. Args: json_bytes (bytes): binary content of a JSON file Returns: dict: JSON data loaded in a dictionary """ json_data = {} try: json_str = _lstrip_bom(json_bytes).decode('utf-8') except UnicodeDecodeError: encoding = chardet.detect(json_bytes)['encoding'] if encoding: json_str = json_bytes.decode(encoding, 'replace') try: json_data = json.loads(json_str) except json.decoder.JSONDecodeError: pass return json_data def extract_intrinsic_metadata(dir_path: str) -> Dict: """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from npm. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive if not os.path.exists(dir_path): return {} lst = os.listdir(dir_path) if len(lst) == 0: return {} project_dirname = lst[0] package_json_path = os.path.join(dir_path, project_dirname, 'package.json') if not os.path.exists(package_json_path): return {} with open(package_json_path, 'rb') as package_json_file: package_json_bytes = package_json_file.read() return load_json(package_json_bytes)