diff --git a/requirements.txt b/requirements.txt index 36f72d8..46390fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner retrying psutil requests +iso8601 diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 94c9fee..08ac652 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,278 +1,240 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import tempfile +import os + +from typing import Generator, Dict, Tuple, Sequence + +from swh.core.tarball import uncompress from swh.core.config import SWHConfig from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, - identifier_to_bytes, normalize_timestamp + identifier_to_bytes ) from swh.storage import get_storage class PackageLoader: - visit = None - """origin visit attribute (dict) set at the beginning of the load method - - """ + # Origin visit type (str) set by the loader visit_type = None - """Origin visit type (str) set by the loader - - """ def __init__(self, url): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: url (str): Origin url to load data from """ self.config = SWHConfig.parse_config_file() self._check_configuration() self.storage = get_storage(**self.config['storage']) # FIXME: No more configuration documentation # Implicitely, this uses the SWH_CONFIG_FILENAME environment variable # loading mechanism # FIXME: Prepare temp folder to uncompress archives self.origin = {'url': url} def _check_configuration(self): """Checks the minimal configuration required is set for the loader. If some required configuration is missing, exception detailing the issue is raised. """ - if not 'storage' in self.config: + if 'storage' not in self.config: raise ValueError( 'Misconfiguration, at least the storage key should be set') - def get_versions(self): + def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. + Returns: + Sequence of published versions + """ return [] - def retrieve_artifacts(self, version): + def get_artifacts(self, version: str) -> Generator[ + Tuple[str, str, Dict], None, None]: """Given a release version of a package, retrieve the associated - artifact for such version. + artifact information for such version. Args: - version (str): Package version + version: Package version Returns: - xxx + (artifact filename, artifact uri, raw artifact metadata) """ - pass + return [] - def fetch_and_uncompress_artifact_archive(self, artifact_archive_path): - """Uncompress artifact archive to a temporary folder and returns its + def fetch_artifact_archive( + self, artifact_archive_path: str, dest: str) -> str: + """Fetch artifact archive to a temporary folder and returns its path. Args: - artifact_archive_path (str): Path to artifact archive to uncompress + artifact_archive_path: Path to artifact archive to uncompress + dest: Directory to write the downloaded archive to Returns: - the uncompressed artifact path (str) + the locally retrieved artifact path """ pass - def get_project_metadata(self, artifact): - """Given an artifact dict, extract the relevant project metadata. - Those will be set within the revision's metadata built - - Args: - artifact (dict): A dict of metadata about a release artifact. + def build_revision(self) -> Dict: + """Build the revision dict Returns: - dict of relevant project metadata (e.g, in pypi loader: - {'project_info': {...}}) + SWH data dict """ return {} - def get_revision_metadata(self, artifact): - """Given an artifact dict, extract the relevant revision metadata. - Those will be set within the 'name' (bytes) and 'message' (bytes) - built revision fields. - - Args: - artifact (dict): A dict of metadata about a release artifact. - - Returns: - dict of relevant revision metadata (name, message keys with values - as bytes) - - """ - pass - - def get_revision_parents(self, version, artifact): - """Build the revision parents history if any - - Args: - version (str): A version string as string (e.g. "0.0.1") - artifact (dict): A dict of metadata about a release artifact. - - Returns: - List of revision ids representing the new revision's parents. - - """ - return [] - def load(self): """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = 'uneventful' # either: eventful, uneventful, failed status_visit = 'partial' # either: partial, full tmp_revisions = {} # Prepare origin and origin_visit (method?) origin = self.storage.origin_add([self.origin])[0] visit = self.storage.origin_visit_add( origin=origin, type=self.visit_type)['visit'] # Retrieve the default release (the "latest" one) default_release = self.get_default_release() # FIXME: Add load exceptions handling for version in self.get_versions(): # for each tmp_revisions[version] = [] - for artifact in self.retrieve_artifacts(version): # 1. - artifact_version = artifact.get('version') - if artifact_version is None: - artifact['version'] = version - - artifact_path = self.fetch_and_uncompress_artifact_archive( - artifact['uri']) # 2. - - # 3. Collect directory information - directory = Directory.from_disk(path=artifact_path, data=True) - # FIXME: Try not to load the full raw content in memory - objects = directory.collect() - - contents = objects['content'].values() - self.storage.content_add(contents) - - status_load = 'eventful' - directories = objects['directory'].values() - self.storage.directory_add(directories) - - # 4. Parse metadata (project, artifact metadata) - metadata = self.get_revision_metadata(artifact) - - # 5. Build revision - name = metadata['name'] - message = metadata['message'] - message = b'%s: %s' % (name, message) if message else name - - revision = { - 'synthetic': True, - 'metadata': { - 'original_artifact': artifact, - **self.get_project_metadata(artifact), - }, - 'author': metadata['author'], - 'date': metadata['date'], - 'committer': metadata['author'], - 'committer_date': metadata['date'], - 'message': message, - 'directory': directory.hash, - 'parents': self.get_revision_parents(version, artifact), - 'type': 'tar', - } - - revision['id'] = identifier_to_bytes( - revision_identifier(revision)) - self.storage.revision_add(revision) - - tmp_revisions[version].append[{ - 'filename': artifact['name'], - 'target': revision['id'], - }] - - # 6. Build and load the snapshot + # `a_` stands for `artifact_` + for a_filename, a_uri, a_metadata in self.get_artifacts(version): + with tempfile.TemporaryDirectory() as tmpdir: + a_path, a_computed_metadata = self.fetch_artifact_archive( + a_uri, dest=tmpdir) + + uncompressed_path = os.path.join(tmpdir, 'src') + uncompress(a_path, dest=uncompressed_path) + + directory = Directory.from_disk( + path=uncompressed_path, data=True) + # FIXME: Try not to load the full raw content in memory + objects = directory.collect() + + contents = objects['content'].values() + self.storage.content_add(contents) + + status_load = 'eventful' + directories = objects['directory'].values() + self.storage.directory_add(directories) + + # FIXME: This should be release. cf. D409 discussion + revision = self.build_revision(uncompressed_path) + revision.update({ + 'type': 'tar', + 'synthetic': True, + 'directory': directory.hash, + }) + revision['metadata'].update({ + 'original_artifact': a_metadata, + 'hashes_artifact': a_computed_metadata + }) + + revision['id'] = identifier_to_bytes( + revision_identifier(revision)) + self.storage.revision_add(revision) + + tmp_revisions[version].append[{ + 'filename': a_filename, + 'target': revision['id'], + }] + + # Build and load the snapshot branches = {} for version, v_branches in tmp_revisions.items(): if len(v_branches) == 1: branch_name = 'releases/%s' % version if version == default_release['version']: branches[b'HEAD'] = { 'target_type': 'alias', 'target': branch_name.encode('utf-8'), } branches[branch_name] = { 'target_type': 'revision', 'target': v_branches[0]['target'], } else: for x in v_branches: branch_name = 'releases/%s/%s' % ( version, v_branches['filename']) branches[branch_name] = { 'target_type': 'revision', 'target': x['target'], } snapshot = { 'branches': branches } snapshot['id'] = identifier_to_bytes( snapshot_identifier(snapshot)) self.storage.snapshot_add([snapshot]) # come so far, we actually reached a full visit status_visit = 'full' # Update the visit's state self.origin_visit_update( origin=self.origin, - visit_id=self.visit['visit'], + visit_id=visit['visit'], status=status_visit, snapshot=snapshot) return {'status': status_load} diff --git a/swh/loader/package/pypi.py b/swh/loader/package/pypi.py index d1fcf85..7c92a3e 100644 --- a/swh/loader/package/pypi.py +++ b/swh/loader/package/pypi.py @@ -1,286 +1,259 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os + +from typing import Generator, Dict, Tuple, Sequence from urllib.parse import urljoin, urlparse from pkginfo import UnpackedSDist +import iso8601 +import requests + +from swh.model.identifiers import normalize_timestamp +from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE from swh.loader.package.loader import PackageLoader try: from swh.loader.core._version import __version__ except ImportError: __version__ = 'devel' DEFAULT_PARAMS = { 'headers': { 'User-Agent': 'Software Heritage Loader (%s)' % ( __version__ ) } } + class PyPIClient: """PyPI client in charge of discussing with the pypi server. Args: url (str): PyPI instance's url (e.g: https://pypi.org/project/request) api: - https://pypi.org/pypi/requests/json - https://pypi.org/pypi/requests/1.0.0/json (release description) """ def __init__(self, url): self.version = __version__ _url = urlparse(url) project_name = _url.path.split('/')[-1] self.url = '%s://%s/pypi/%s' % (_url.scheme, _url.netloc, project_name) self.session = requests.session() self.params = DEFAULT_PARAMS def _get(self, url): """Get query to the url. Args: url (str): Url Raises: ValueError in case of failing to query Returns: Response as dict if ok """ response = self.session.get(url, **self.params) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) return response.json() def info_project(self): """Given a url, retrieve the raw json response Returns: Main project information as dict. """ return self._get(urljoin(self.url, 'json')) def info_release(self, release): """Given a project and a release name, retrieve the raw information for said project's release. Args: release (dict): Release information Returns: Release information as dict """ return self._get(urljoin(self.url, release, 'json')) -class ArchiveFetcher: - """Http/Local client in charge of downloading archives from a - remote/local server. +def download(url: str, dest: str) -> Tuple[str, Dict]: + """Download a remote tarball from url, uncompresses and computes swh hashes + on it. Args: - temp_directory (str): Path to the temporary disk location used - for downloading the release artifacts - - """ - def __init__(self, temp_directory=None): - self.temp_directory = temp_directory - self.session = requests.session() - self.params = DEFAULT_PARAMS + url: Artifact uri to fetch, uncompress and hash + dest: Directory to write the archive to - def download(self, url): - """Download the remote tarball url locally. + Raises: + ValueError in case of any error when fetching/computing - Args: - url (str): Url (file or http*) - - Raises: - ValueError in case of failing to query - - Returns: - Tuple of local (filepath, hashes of filepath) - - """ - url_parsed = urlparse(url) - if url_parsed.scheme == 'file': - path = url_parsed.path - response = LocalResponse(path) - length = os.path.getsize(path) - else: - response = self.session.get(url, **self.params, stream=True) - if response.status_code != 200: - raise ValueError("Fail to query '%s'. Reason: %s" % ( - url, response.status_code)) - length = int(response.headers['content-length']) - - filepath = os.path.join(self.temp_directory, os.path.basename(url)) - - h = MultiHash(length=length) - with open(filepath, 'wb') as f: - for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): - h.update(chunk) - f.write(chunk) - - actual_length = os.path.getsize(filepath) - if length != actual_length: - raise ValueError('Error when checking size: %s != %s' % ( - length, actual_length)) + Returns: + Tuple of local (filepath, hashes of filepath) - return { - 'path': filepath, - 'length': length, - **h.hexdigest() - } + """ + response = requests.get(url, **DEFAULT_PARAMS, stream=True) + if response.status_code != 200: + raise ValueError("Fail to query '%s'. Reason: %s" % ( + url, response.status_code)) + length = int(response.headers['content-length']) + + filepath = os.path.join(dest, os.path.basename(url)) + + h = MultiHash(length=length) + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): + h.update(chunk) + f.write(chunk) + + actual_length = os.path.getsize(filepath) + if length != actual_length: + raise ValueError('Error when checking size: %s != %s' % ( + length, actual_length)) + + # hashes = h.hexdigest() + # actual_digest = hashes['sha256'] + # if actual_digest != artifact['sha256']: + # raise ValueError( + # '%s %s: Checksum mismatched: %s != %s' % ( + # project, version, artifact['sha256'], actual_digest)) + + return filepath, { + 'length': length, + **h.hexdigest() + } -def sdist_parse(path): +def sdist_parse(dir_path): """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from pypi. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ - def _to_dict(pkginfo): - """Given a pkginfo parsed structure, convert it to a dict. - - Args: - pkginfo (UnpackedSDist): The sdist parsed structure - - Returns: - parsed structure as a dict - - """ - m = {} - for k in pkginfo: - m[k] = getattr(pkginfo, k) - return m - # Retrieve the root folder of the archive project_dirname = os.listdir(dir_path)[0] pkginfo_path = os.path.join(dir_path, project_dirname, 'PKG-INFO') if not os.path.exists(pkginfo_path): return None pkginfo = UnpackedSDist(pkginfo_path) - return _to_dict(pkginfo) + return pkginfo.__dict__ -class PyPILoader(PackageLoader): - """Load pypi origin's artifact releases into swh archive. - - """ - visit_type = 'pypi' +def author(data: Dict) -> Dict: + """Given a dict of project/release artifact information (coming from + PyPI), returns an author subset. - def __init__(self, url): - super().__init__(url=url, visit_type='pypi') - self.client = PyPIClient(url) - self.archive_fetcher = ArchiveFetcher( - temp_directory=os.mkdtemp()) - self.info = self.client.info_project() # dict - self.artifact_metadata = {} + Args: + data (dict): Representing either artifact information or + release information. - def get_versions(self): - """Return the list of all published package versions. + Returns: + swh-model dict representing a person. - """ - return self.info['releases'].keys() + """ + name = data.get('author') + email = data.get('author_email') - def retrieve_artifacts(self, version): - """Given a release version of a package, retrieve the associated - artifacts for such version. + if email: + fullname = '%s <%s>' % (name, email) + else: + fullname = name - Args: - version (str): Package version + if not fullname: + return {'fullname': b'', 'name': None, 'email': None} - Returns: - a list of metadata dict about the artifacts for that version. - For each dict, the 'name' field is the uri to retrieve the - artifacts + fullname = fullname.encode('utf-8') - """ - artifact_metadata = self.info['releases'][version] - for meta in artifact_metadata: - url = meta.pop('url') - meta['uri'] = url - return artifact_metadata + if name is not None: + name = name.encode('utf-8') - def fetch_and_uncompress_artifact_archive(self, artifact_archive_path): - """Uncompress artifact archive to a temporary folder and returns its - path. + if email is not None: + email = email.encode('utf-8') - Args: - artifact_archive_path (str): Path to artifact archive to uncompress + return {'fullname': fullname, 'name': name, 'email': email} - Returns: - the uncompressed artifact path (str) - """ - return self.sdist_parse(artifact_archive_path) +class PyPILoader(PackageLoader): + """Load pypi origin's artifact releases into swh archive. - def get_project_metadata(self, artifact): - """Given an artifact dict, extract the relevant project metadata. - Those will be set within the revision's metadata built + """ + visit_type = 'pypi' - Args: - artifact (dict): A dict of metadata about a release artifact. + def __init__(self, url): + super().__init__(url=url, visit_type='pypi') + self.client = PyPIClient(url) + self._info = None - Returns: - dict of relevant project metadata (e.g, in pypi loader: - {'project_info': {...}}) + @property + def info(self) -> Dict: + """Return the project metadata information (fetched from pypi registry) """ - version = artifact['version'] - if version not in self.artifact_metadata: - self.artifact_metadata[version] = self.client.info_release(version) - return self.artifact_metadata[version]['info'] + if not self._info: + self._info = self.client.info_project() # dict + return self._info - def get_revision_metadata(self, artifact): - """Given an artifact dict, extract the relevant revision metadata. - Those will be set within the 'name' (bytes) and 'message' (bytes) - built revision fields. - - Args: - artifact (dict): A dict of metadata about a release artifact. + def get_versions(self) -> Sequence[str]: + return self.info['releases'].keys() - Returns: - dict of relevant revision metadata (name, message keys with values - as bytes) + def get_artifacts(self, version: str) -> Generator[ + Tuple[str, str, Dict], None, None]: + for meta in self.info['releases'][version]: + yield meta['filename'], meta['url'], meta - """ - version = artifact['version'] - # fetch information - if version not in self.artifact_metadata: - self.artifact_metadata[version] = self.client.info_release(version) + def fetch_artifact_archive( + self, artifact_uri: str, dest: str) -> Tuple[str, Dict]: + return download(artifact_uri, dest=dest) - releases = self.artifact_metadata[version]['releases'] - for _artifact in releases[version]: - if _artifact['url'] == artifact['uri']: - break + def build_revision(self, artifact_uncompressed_path: str) -> Dict: + # Parse metadata (project, artifact metadata) + metadata = sdist_parse(artifact_uncompressed_path) - if not _artifact: # should we? - raise ValueError('Revision metadata not found for artifact %s' % artifact['uri']) + # Build revision + name = metadata['version'].encode('utf-8') + message = metadata['message'].encode('utf-8') + message = b'%s: %s' % (name, message) if message else name + _author = author(metadata) + _date = normalize_timestamp( + int(iso8601.parse_date(metadata['date']).timestamp())) return { - 'name': version, - 'message': _artifact.get('comment_text', '').encode('utf-8') + 'name': name, + 'message': message, + 'author': _author, + 'date': _date, + 'committer': _author, + 'committer_date': _date, + 'parents': [], + 'metadata': { + 'intrinsic_metadata': metadata, + } }