diff --git a/swh/loader/package/archive/loader.py b/swh/loader/package/archive/loader.py index aa23794..e689dff 100644 --- a/swh/loader/package/archive/loader.py +++ b/swh/loader/package/archive/loader.py @@ -1,123 +1,126 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import iso8601 import logging from os import path from typing import Any, Dict, Generator, Mapping, Optional, Sequence, Tuple from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import release_name, artifact_identity from swh.model.identifiers import normalize_timestamp logger = logging.getLogger(__name__) SWH_PERSON = { 'name': b'Software Heritage', 'fullname': b'Software Heritage', 'email': b'robot@softwareheritage.org' } REVISION_MESSAGE = b'swh-loader-package: synthetic revision message' class ArchiveLoader(PackageLoader): """Load archive origin's artifact files into swh archive """ visit_type = 'tar' def __init__(self, url: str, artifacts: Sequence[Mapping[str, Any]], identity_artifact_keys: Optional[Sequence[str]] = None): """Loader constructor. For now, this is the lister's task output. Args: url: Origin url artifacts: List of artifact information with keys: - **time**: last modification time as either isoformat date string - or timestamp - **url**: the artifact url to retrieve filename - **artifact's filename version**: artifact's version length - **length**: artifact's length + - **time**: last modification time as either isoformat date + string or timestamp + + - **url**: the artifact url to retrieve filename + + - **artifact's filename version**: artifact's version length + + - **length**: artifact's length identity_artifact_keys: Optional List of keys forming the "identity" of an artifact """ super().__init__(url=url) self.artifacts = artifacts # assume order is enforced in the lister if not identity_artifact_keys: # default keys for gnu identity_artifact_keys = ['time', 'url', 'length', 'version'] self.identity_artifact_keys = identity_artifact_keys def get_versions(self) -> Sequence[str]: versions = [] for archive in self.artifacts: v = archive.get('version') if v: versions.append(v) return versions def get_default_version(self) -> str: # It's the most recent, so for this loader, it's the last one return self.artifacts[-1]['version'] def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: for a_metadata in self.artifacts: url = a_metadata['url'] package_version = a_metadata['version'] if version == package_version: filename = a_metadata.get('filename') p_info = { 'url': url, 'filename': filename if filename else path.split(url)[-1], 'raw': a_metadata, } # FIXME: this code assumes we have only 1 artifact per # versioned package yield release_name(version), p_info def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: identity = artifact_identity( artifact_metadata, id_keys=self.identity_artifact_keys) for rev_id, known_artifact in known_artifacts.items(): logging.debug('known_artifact: %s', known_artifact) reference_artifact = known_artifact['extrinsic']['raw'] known_identity = artifact_identity( reference_artifact, id_keys=self.identity_artifact_keys) if identity == known_identity: return rev_id return None def build_revision(self, a_metadata: Mapping[str, Any], uncompressed_path: str) -> Dict: time = a_metadata['time'] # assume it's a timestamp if isinstance(time, str): # otherwise, assume it's a parsable date time = iso8601.parse_date(time) normalized_time = normalize_timestamp(time) return { 'type': 'tar', 'message': REVISION_MESSAGE, 'date': normalized_time, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'committer_date': normalized_time, 'parents': [], 'metadata': { 'intrinsic': {}, 'extrinsic': { 'provider': self.url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, }, } diff --git a/swh/loader/package/npm/loader.py b/swh/loader/package/npm/loader.py index 6ae7669..2a48bfe 100644 --- a/swh/loader/package/npm/loader.py +++ b/swh/loader/package/npm/loader.py @@ -1,267 +1,267 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import os from codecs import BOM_UTF8 from typing import Any, Dict, Generator, Mapping, Sequence, Tuple, Optional import chardet import iso8601 from urllib.parse import quote from swh.model.identifiers import normalize_timestamp from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import ( api_info, release_name, parse_author, swh_author ) logger = logging.getLogger(__name__) class NpmLoader(PackageLoader): """Load npm origin's artifact releases into swh archive. """ visit_type = 'npm' def __init__(self, url: str): """Constructor Args str: origin url (e.g. https://www.npmjs.com/package/) """ super().__init__(url=url) package_name = url.split('https://www.npmjs.com/package/')[1] safe_name = quote(package_name, safe='') self.provider_url = f'https://replicate.npmjs.com/{safe_name}/' self._info: Dict[str, Any] = {} self._versions = None @property def info(self) -> Dict[str, Any]: """Return the project metadata information (fetched from npm registry) """ if not self._info: self._info = api_info(self.provider_url) return self._info def get_versions(self) -> Sequence[str]: return sorted(list(self.info['versions'].keys())) def get_default_version(self) -> str: return self.info['dist-tags'].get('latest', '') def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: meta = self.info['versions'][version] url = meta['dist']['tarball'] p_info = { 'url': url, 'filename': os.path.basename(url), 'raw': meta, } yield release_name(version), p_info def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: return artifact_to_revision_id(known_artifacts, artifact_metadata) def build_revision( self, a_metadata: Dict, uncompressed_path: str) -> Dict: i_metadata = extract_intrinsic_metadata(uncompressed_path) # from intrinsic metadata author = extract_npm_package_author(i_metadata) message = i_metadata['version'].encode('ascii') # from extrinsic metadata # No date available in intrinsic metadata: retrieve it from the API # metadata, using the version number that the API claims this package # has. extrinsic_version = a_metadata['version'] date = self.info['time'][extrinsic_version] date = iso8601.parse_date(date) date = normalize_timestamp(int(date.timestamp())) return { 'type': 'tar', 'message': message, 'author': author, 'date': date, 'committer': author, 'committer_date': date, 'parents': [], 'metadata': { 'intrinsic': { 'tool': 'package.json', 'raw': i_metadata, }, 'extrinsic': { 'provider': self.provider_url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, }, } def artifact_to_revision_id( known_artifacts: Dict, artifact_metadata: Dict) -> Optional[bytes]: """Given metadata artifact, solves the associated revision id. The following code allows to deal with 2 metadata formats: - - old format sample: + - old format sample:: { 'package_source': { 'sha1': '05181c12cd8c22035dd31155656826b85745da37', } } - - new format sample: + - new format sample:: { 'original_artifact': [{ 'checksums': { - 'sha256': "6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec", # noqa + 'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa ... }, }], ... } """ shasum = artifact_metadata['dist']['shasum'] for rev_id, known_artifact in known_artifacts.items(): known_original_artifact = known_artifact.get('original_artifact') if not known_original_artifact: # previous loader-npm version kept original artifact elsewhere known_original_artifact = known_artifact.get('package_source') if not known_original_artifact: continue original_hash = known_original_artifact['sha1'] else: assert isinstance(known_original_artifact, list) original_hash = known_original_artifact[0]['checksums']['sha1'] if shasum == original_hash: return rev_id return None def extract_npm_package_author(package_json): """ Extract package author from a ``package.json`` file content and return it in swh format. Args: package_json (dict): Dict holding the content of parsed ``package.json`` file Returns: dict: A dict with the following keys: * fullname * name * email """ def _author_str(author_data): if type(author_data) is dict: author_str = '' if 'name' in author_data: author_str += author_data['name'] if 'email' in author_data: author_str += ' <%s>' % author_data['email'] return author_str elif type(author_data) is list: return _author_str(author_data[0]) if len(author_data) > 0 else '' else: return author_data author_data = {} for author_key in ('author', 'authors'): if author_key in package_json: author_str = _author_str(package_json[author_key]) author_data = parse_author(author_str) return swh_author(author_data) def _lstrip_bom(s, bom=BOM_UTF8): if s.startswith(bom): return s[len(bom):] else: return s def load_json(json_bytes): """ Try to load JSON from bytes and return a dictionary. First try to decode from utf-8. If the decoding failed, try to detect the encoding and decode again with replace error handling. If JSON is malformed, an empty dictionary will be returned. Args: json_bytes (bytes): binary content of a JSON file Returns: dict: JSON data loaded in a dictionary """ json_data = {} try: json_str = _lstrip_bom(json_bytes).decode('utf-8') except UnicodeDecodeError: encoding = chardet.detect(json_bytes)['encoding'] if encoding: json_str = json_bytes.decode(encoding, 'replace') try: json_data = json.loads(json_str) except json.decoder.JSONDecodeError: pass return json_data def extract_intrinsic_metadata(dir_path: str) -> Dict: """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from npm. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive if not os.path.exists(dir_path): return {} lst = os.listdir(dir_path) if len(lst) == 0: return {} project_dirname = lst[0] package_json_path = os.path.join(dir_path, project_dirname, 'package.json') if not os.path.exists(package_json_path): return {} with open(package_json_path, 'rb') as package_json_file: package_json_bytes = package_json_file.read() return load_json(package_json_bytes) diff --git a/swh/loader/package/pypi/loader.py b/swh/loader/package/pypi/loader.py index c0ec2dc..37415cc 100644 --- a/swh/loader/package/pypi/loader.py +++ b/swh/loader/package/pypi/loader.py @@ -1,245 +1,245 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from typing import Any, Dict, Generator, Mapping, Optional, Sequence, Tuple from urllib.parse import urlparse from pkginfo import UnpackedSDist import iso8601 from swh.model.identifiers import normalize_timestamp from swh.loader.package.loader import PackageLoader from swh.loader.package.utils import api_info, release_name logger = logging.getLogger(__name__) class PyPILoader(PackageLoader): """Load pypi origin's artifact releases into swh archive. """ visit_type = 'pypi' def __init__(self, url): super().__init__(url=url) self._info = None self.provider_url = pypi_api_url(self.url) @property def info(self) -> Dict: """Return the project metadata information (fetched from pypi registry) """ if not self._info: self._info = api_info(self.provider_url) return self._info def get_versions(self) -> Sequence[str]: return self.info['releases'].keys() def get_default_version(self) -> str: return self.info['info']['version'] def get_package_info(self, version: str) -> Generator[ Tuple[str, Mapping[str, Any]], None, None]: res = [] for meta in self.info['releases'][version]: if meta['packagetype'] != 'sdist': continue filename = meta['filename'] p_info = { 'url': meta['url'], 'filename': filename, 'raw': meta, } res.append((version, p_info)) if len(res) == 1: version, p_info = res[0] yield release_name(version), p_info else: for version, p_info in res: yield release_name(version, p_info['filename']), p_info def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict) \ -> Optional[bytes]: return artifact_to_revision_id(known_artifacts, artifact_metadata) def build_revision( self, a_metadata: Dict, uncompressed_path: str) -> Dict: i_metadata = extract_intrinsic_metadata(uncompressed_path) # from intrinsic metadata name = i_metadata['version'] _author = author(i_metadata) # from extrinsic metadata message = a_metadata.get('comment_text', '') message = '%s: %s' % (name, message) if message else name date = normalize_timestamp( int(iso8601.parse_date(a_metadata['upload_time']).timestamp())) return { 'type': 'tar', 'message': message.encode('utf-8'), 'author': _author, 'date': date, 'committer': _author, 'committer_date': date, 'parents': [], 'metadata': { 'intrinsic': { 'tool': 'PKG-INFO', 'raw': i_metadata, }, 'extrinsic': { 'provider': self.provider_url, 'when': self.visit_date.isoformat(), 'raw': a_metadata, }, } } def artifact_to_revision_id( known_artifacts: Dict, artifact_metadata: Dict) -> Optional[bytes]: """Given metadata artifact, solves the associated revision id. The following code allows to deal with 2 metadata formats (column metadata in 'revision') - - old format sample: + - old format sample:: { 'original_artifact': { - 'sha256': "6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec", # noqa + 'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa ... }, ... } - - new format sample: + - new format sample:: { 'original_artifact': [{ 'checksums': { - 'sha256': "6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec", # noqa + 'sha256': '6975816f2c5ad4046acc676ba112f2fff945b01522d63948531f11f11e0892ec', # noqa ... }, }], ... } """ sha256 = artifact_metadata['digests']['sha256'] for rev_id, known_artifact in known_artifacts.items(): original_artifact = known_artifact['original_artifact'] if isinstance(original_artifact, dict): # previous loader-pypi version stored metadata as dict original_sha256 = original_artifact['sha256'] if sha256 == original_sha256: return rev_id continue # new pypi loader actually store metadata dict differently... assert isinstance(original_artifact, list) # current loader-pypi stores metadata as list of dict for original_artifact in known_artifact['original_artifact']: if sha256 == original_artifact['checksums']['sha256']: return rev_id return None def pypi_api_url(url: str) -> str: """Compute api url from a project url Args: url (str): PyPI instance's url (e.g: https://pypi.org/project/requests) This deals with correctly transforming the project's api url (e.g https://pypi.org/pypi/requests/json) Returns: api url """ p_url = urlparse(url) project_name = p_url.path.rstrip('/').split('/')[-1] url = '%s://%s/pypi/%s/json' % (p_url.scheme, p_url.netloc, project_name) return url def extract_intrinsic_metadata(dir_path: str) -> Dict: """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from pypi. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive if not os.path.exists(dir_path): return {} lst = os.listdir(dir_path) if len(lst) != 1: return {} project_dirname = lst[0] pkginfo_path = os.path.join(dir_path, project_dirname, 'PKG-INFO') if not os.path.exists(pkginfo_path): return {} pkginfo = UnpackedSDist(pkginfo_path) raw = pkginfo.__dict__ raw.pop('filename') # this gets added with the ondisk location return raw def author(data: Dict) -> Dict: """Given a dict of project/release artifact information (coming from PyPI), returns an author subset. Args: data (dict): Representing either artifact information or release information. Returns: swh-model dict representing a person. """ name = data.get('author') email = data.get('author_email') fullname = None # type: Optional[str] if email: fullname = '%s <%s>' % (name, email) else: fullname = name if not fullname: return {'fullname': b'', 'name': None, 'email': None} if name is not None: name = name.encode('utf-8') if email is not None: email = email.encode('utf-8') return { 'fullname': fullname.encode('utf-8'), 'name': name, 'email': email }