Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | |||||
import logging | |||||
import tempfile | |||||
import os | import os | ||||
import shutil | |||||
import requests | |||||
try: | |||||
from _version import __version__ | |||||
except ImportError: | |||||
__version__ = 'devel' | |||||
from typing import Generator, Dict, Tuple, Sequence, List | |||||
from tempfile import mkdtemp | from swh.core.tarball import uncompress | ||||
from swh.core.config import SWHConfig | |||||
from swh.core import tarball | |||||
from swh.loader.core.utils import clean_dangling_folders | |||||
from swh.loader.core.loader import BufferedLoader | |||||
from swh.model.identifiers import normalize_timestamp | |||||
from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE | |||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
identifier_to_bytes, revision_identifier, snapshot_identifier | revision_identifier, snapshot_identifier, identifier_to_bytes | ||||
) | ) | ||||
from swh.storage import get_storage | |||||
from swh.loader.core.converters import content_for_storage | |||||
DEBUG_MODE = '** DEBUG MODE **' | |||||
class GNULoader(BufferedLoader): | |||||
SWH_PERSON = { | |||||
'name': b'Software Heritage', | |||||
'fullname': b'Software Heritage', | |||||
'email': b'robot@softwareheritage.org' | |||||
} | |||||
REVISION_MESSAGE = b'swh-loader-package: synthetic revision message' | |||||
visit_type = 'gnu' | |||||
def __init__(self): | |||||
self.TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.gnu.' | |||||
super().__init__(logging_class='swh.loader.package.GNULoader') | |||||
self.dir_path = None | |||||
temp_directory = self.config['temp_directory'] | |||||
os.makedirs(temp_directory, exist_ok=True) | |||||
self.temp_directory = mkdtemp( | |||||
suffix='-%s' % os.getpid(), | |||||
prefix=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=temp_directory) | |||||
self.debug = self.config.get('debug', False) | |||||
self.session = requests.session() | |||||
self.params = { | |||||
'headers': { | |||||
'User-Agent': 'Software Heritage Loader (%s)' % ( | |||||
__version__ | |||||
) | |||||
} | |||||
} | |||||
def pre_cleanup(self): | logger = logging.getLogger(__name__) | ||||
"""To prevent disk explosion if some other workers exploded | |||||
in mid-air (OOM killed), we try and clean up dangling files. | |||||
""" | |||||
if self.debug: | |||||
self.log.warning('%s Will not pre-clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
clean_dangling_folders(self.temp_directory, | |||||
pattern_check=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
log=self.log) | |||||
def prepare_origin_visit(self, name, origin_url, **kwargs): | # Not implemented yet: | ||||
"""Prepare package visit. | # - clean up disk routines from previous killed workers (when OOMkilled) | ||||
# -> separation of concern would like this to be abstracted from the code | |||||
# -> experience tells us it's complicated to do as such (T903, T964, T982, | |||||
# etc...) | |||||
# | |||||
# - splitting into groups too many objects sent to storage > could be a > | |||||
# -> specialized collaborator or storage implementation or proxy which deals | |||||
# with this | |||||
# | |||||
# - model: swh.model.merkle.from_disk should output swh.model.model.* objects | |||||
# to avoid this layer's conversion routine call | |||||
# -> Take this up within swh.model's current implementation | |||||
# | |||||
# - Does not trap exceptions yet within the PackageLoader.load method | |||||
class PackageLoader: | |||||
# Origin visit type (str) set by the loader | |||||
visit_type = '' | |||||
def __init__(self, url): | |||||
"""Loader's constructor. This raises exception if the minimal required | |||||
configuration is missing (cf. fn:`check` method). | |||||
Args: | Args: | ||||
name (str): Package Name | url (str): Origin url to load data from | ||||
origin_url (str): Package origin url | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | """ | ||||
# reset statuses | # This expects to use the environment variable SWH_CONFIG_FILENAME | ||||
self._load_status = 'uneventful' | self.config = SWHConfig.parse_config_file() | ||||
self._visit_status = 'full' | self._check_configuration() | ||||
self.done = False | self.storage = get_storage(**self.config['storage']) | ||||
self.url = url | |||||
self.origin = { | |||||
'url': origin_url, | |||||
'type': self.visit_type, | |||||
} | |||||
self.visit_date = None # loader core will populate it | |||||
def prepare(self, name, origin_url, **kwargs): | def _check_configuration(self): | ||||
"""Prepare effective loading of source tarballs for a package manager | """Checks the minimal configuration required is set for the loader. | ||||
package. | |||||
Args: | If some required configuration is missing, exception detailing the | ||||
name (str): Package Name | issue is raised. | ||||
origin_url (str): Package origin url | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | """ | ||||
self.package_contents = [] | if 'storage' not in self.config: | ||||
self.package_directories = [] | raise ValueError( | ||||
self.package_revisions = [] | 'Misconfiguration, at least the storage key should be set') | ||||
self.all_version_data = [] | |||||
self.latest_timestamp = 0 | |||||
# Conceled the data into one dictionary to eliminate the need of | |||||
# passing all the parameters when required in some method | |||||
self.package_details = { | |||||
'name': name, | |||||
'origin_url': origin_url, | |||||
'tarballs': kwargs['tarballs'], | |||||
} | |||||
self.package_temp_dir = os.path.join(self.temp_directory, | |||||
self.package_details['name']) | |||||
self.new_versions = \ | def get_versions(self) -> Sequence[str]: | ||||
self.prepare_package_versions(self.package_details['tarballs']) | """Return the list of all published package versions. | ||||
def prepare_package_versions(self, tarballs): | |||||
""" | |||||
Instantiate a generator that will process a specific package release | |||||
version at each iteration step. The following operations will be | |||||
performed: | |||||
1. Create a temporary directory to download and extract the | |||||
release tarball. | |||||
2. Download the tarball. | |||||
3. Uncompress the tarball. | |||||
4. Parse the file associated to the package version to extract | |||||
metadata (optional). | |||||
5. Delete unnecessary files (optional). | |||||
Args: | Returns: | ||||
tarballs (list): a list of dicts containing information about the | Sequence of published versions | ||||
respective tarball that is provided by lister. | |||||
known_versions (dict): may be provided by the loader, it enables | |||||
to filter out versions already ingested in the archive. | |||||
Yields: | |||||
Tuple[dict, str]: tuples containing the following | |||||
members: | |||||
* a dict holding package tarball information and metadata | |||||
* a string holding the path of the uncompressed package to | |||||
load into the archive | |||||
""" | """ | ||||
for package_version_data in tarballs: | return [] | ||||
tarball_url = package_version_data['archive'] | |||||
tarball_request = self._request(tarball_url, | |||||
throw_error=False) | |||||
if tarball_request.status_code == 404: | |||||
self.log.warning('Tarball url %s returns a 404 error.', | |||||
tarball_url) | |||||
self._visit_status = 'partial' | |||||
# FIX ME: Do we need to mark it `partial` here | |||||
continue | |||||
yield self._prepare_package_version(package_version_data, | def get_artifacts(self, version: str) -> Generator[ | ||||
tarball_request) | Tuple[str, str, Dict], None, None]: | ||||
"""Given a release version of a package, retrieve the associated | |||||
def _request(self, url, throw_error=True): | artifact information for such version. | ||||
"""Request the remote tarball url. | |||||
Args: | Args: | ||||
url (str): Url (file or http*). | version: Package version | ||||
Raises: | |||||
ValueError in case of failing to query. | |||||
Returns: | Returns: | ||||
Tuple of local (filepath, hashes of filepath). | (artifact filename, artifact uri, raw artifact metadata) | ||||
""" | """ | ||||
response = self.session.get(url, **self.params, stream=True) | yield from {} | ||||
if response.status_code != 200 and throw_error: | |||||
raise ValueError("Fail to query '%s'. Reason: %s" % ( | |||||
url, response.status_code)) | |||||
return response | |||||
def _prepare_package_version(self, package_version_data, tarball_request): | |||||
"""Process the package release version. | |||||
The following operations are performed: | |||||
1. Download the tarball | |||||
2. Uncompress the tarball | |||||
3. Delete unnecessary files (optional) | |||||
4. Parse the file associated to the package version to extract | |||||
metadata (optional) | |||||
Args: | |||||
package_version_data (dict): containing information | |||||
about the focused package version. | |||||
known_versions (dict): may be provided by the loader, it enables | |||||
to filter out versions already ingested in the archive. | |||||
Return: | |||||
Tuple[dict, str]: tuples containing the following | |||||
members: | |||||
* a dict holding package tarball information and metadata | |||||
* a string holding the path of the uncompressed package to | |||||
load into the archive | |||||
""" | |||||
url = package_version_data['archive'] | |||||
tarball_path, hashes = self.download_generate_hash(tarball_request, | |||||
url) | |||||
uncompressed_path = os.path.join(self.package_temp_dir, 'uncompressed', | |||||
os.path.basename(url)) # SEE ME | |||||
self.uncompress_tarball(tarball_path, uncompressed_path) | |||||
# remove tarball | |||||
os.remove(tarball_path) | |||||
if self.tarball_invalid: | |||||
return None, None | |||||
return package_version_data, uncompressed_path | def fetch_artifact_archive( | ||||
self, artifact_archive_path: str, dest: str) -> Tuple[str, Dict]: | |||||
def download_generate_hash(self, response, url): | """Fetch artifact archive to a temporary folder and returns its | ||||
"""Store file in temp directory and computes hash of its filepath. | path. | ||||
Args: | Args: | ||||
response (Response): Server response of the url | artifact_archive_path: Path to artifact archive to uncompress | ||||
url (str): Url of the tarball | dest: Directory to write the downloaded archive to | ||||
Returns: | Returns: | ||||
Tuple of local (filepath, hashes of filepath) | the locally retrieved artifact path | ||||
""" | """ | ||||
length = int(response.headers['content-length']) | return '', {} | ||||
os.makedirs(self.package_temp_dir, exist_ok=True) | |||||
# SEE ME | |||||
filepath = os.path.join(self.package_temp_dir, os.path.basename(url)) | |||||
# Convert the server response to a file. | |||||
h = MultiHash(length=length) | |||||
with open(filepath, 'wb') as f: | |||||
for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): | |||||
h.update(chunk) | |||||
f.write(chunk) | |||||
# Check for the validity of the tarball downloaded. | |||||
actual_length = os.path.getsize(filepath) | |||||
if length != actual_length: | |||||
raise ValueError('Error when checking size: %s != %s' % ( | |||||
length, actual_length)) | |||||
hashes = { | |||||
'length': length, | |||||
**h.hexdigest() | |||||
} | |||||
return filepath, hashes | |||||
def uncompress_tarball(self, filepath, path): | def build_revision( | ||||
"""Uncompress a tarball. | self, a_metadata: Dict, a_uncompressed_path: str) -> Dict: | ||||
"""Build the revision dict | |||||
Args: | |||||
filepath (str): Path of tarball to uncompress | |||||
path (str): The destination folder where to uncompress the tarball | |||||
Returns: | Returns: | ||||
The nature of the tarball, zip or tar. | SWH data dict | ||||
""" | """ | ||||
try: | return {} | ||||
self.tarball_invalid = False | |||||
tarball.uncompress(filepath, path) | def get_default_release(self) -> str: | ||||
except Exception: | """Retrieve the latest release version | ||||
self.tarball_invalid = True | |||||
self._visit_status = 'partial' | |||||
def fetch_data(self): | |||||
"""Called once per release artifact version (can be many for one | |||||
release). | |||||
This will for each call: | |||||
- retrieve a release artifact (associated to a release version) | |||||
- Computes the swh objects | |||||
Returns: | Returns: | ||||
True as long as data to fetch exist | Latest version | ||||
""" | """ | ||||
data = None | return '' | ||||
if self.done: | |||||
return False | |||||
try: | |||||
data = next(self.new_versions) | |||||
self._load_status = 'eventful' | |||||
except StopIteration: | |||||
self.done = True | |||||
return False | |||||
package_version_data, dir_path = data | |||||
# package release tarball was corrupted | |||||
if self.tarball_invalid: | |||||
return not self.done | |||||
dir_path = dir_path.encode('utf-8') | |||||
directory = Directory.from_disk(path=dir_path, data=True) | |||||
objects = directory.collect() | |||||
if 'content' not in objects: | def load(self) -> Dict: | ||||
objects['content'] = {} | """Load for a specific origin the associated contents. | ||||
if 'directory' not in objects: | |||||
objects['directory'] = {} | |||||
self.package_contents = objects['content'].values() | for each package version of the origin | ||||
self.package_directories = objects['directory'].values() | |||||
revision = self.build_revision(directory, | 1. Fetch the files for one package version By default, this can be | ||||
package_version_data) | implemented as a simple HTTP request. Loaders with more specific | ||||
requirements can override this, e.g.: the PyPI loader checks the | |||||
integrity of the downloaded files; the Debian loader has to download | |||||
and check several files for one package version. | |||||
revision['id'] = identifier_to_bytes( | 2. Extract the downloaded files By default, this would be a universal | ||||
revision_identifier(revision)) | archive/tarball extraction. | ||||
self.package_revisions.append(revision) | |||||
self.log.debug(revision) | |||||
package_version_data['id'] = revision['id'] | |||||
self.all_version_data.append(package_version_data) | |||||
# To find the latest version | |||||
if self.latest_timestamp < int(package_version_data['date']): | |||||
self.latest_timestamp = int(package_version_data['date']) | |||||
self.log.debug('Removing unpacked package files at %s', dir_path) | |||||
shutil.rmtree(dir_path) | |||||
return not self.done | |||||
def build_revision(self, directory, package_version_data): | |||||
normalize_date = normalize_timestamp(int(package_version_data['date'])) | |||||
return { | |||||
'metadata': { | |||||
'package': { | |||||
'date': package_version_data['date'], | |||||
'archive': package_version_data['archive'], | |||||
}, | |||||
}, | |||||
'date': normalize_date, | |||||
'committer_date': normalize_date, | |||||
'author': self.SWH_PERSON, | |||||
'committer': self.SWH_PERSON, | |||||
'type': 'tar', | |||||
'message': self.REVISION_MESSAGE, | |||||
'directory': directory.hash, | |||||
'synthetic': True, | |||||
'parents': [], | |||||
} | |||||
def store_data(self): | |||||
"""Store fetched data in the database. | |||||
""" | |||||
self.maybe_load_contents(self.package_contents) | |||||
self.maybe_load_directories(self.package_directories) | |||||
self.maybe_load_revisions(self.package_revisions) | |||||
if self.done: | |||||
self.generate_and_load_snapshot() | |||||
self.flush() | |||||
def generate_and_load_snapshot(self): | |||||
"""Generate and load snapshot for the package visit. | |||||
""" | Loaders for specific formats can override this method (for instance, | ||||
branches = {} | the Debian loader uses dpkg-source -x). | ||||
for version_data in self.all_version_data: | |||||
branch_name = self.find_branch_name(version_data['archive']) | |||||
target = self.target_from_version(version_data['id']) | |||||
branches[branch_name] = target | |||||
branches = self.find_head(branches, branch_name, | |||||
version_data['date']) | |||||
if not target: | 3. Convert the extracted directory to a set of Software Heritage | ||||
self._visit_status = 'partial' | objects Using swh.model.from_disk. | ||||
snapshot = { | |||||
'branches': branches, | |||||
} | |||||
snapshot['id'] = identifier_to_bytes(snapshot_identifier(snapshot)) | 4. Extract the metadata from the unpacked directories This would only | ||||
self.maybe_load_snapshot(snapshot) | be applicable for "smart" loaders like npm (parsing the | ||||
package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing | |||||
debian/changelog and debian/control). | |||||
def find_branch_name(self, url): | On "minimal-metadata" sources such as the GNU archive, the lister | ||||
"""Extract branch name from tarball url | should provide the minimal set of metadata needed to populate the | ||||
revision/release objects (authors, dates) as an argument to the | |||||
task. | |||||
Args: | 5. Generate the revision/release objects for the given version. From | ||||
url (str): Tarball URL | the data generated at steps 3 and 4. | ||||
Returns: | end for each | ||||
byte: Branch name | |||||
Example: | 6. Generate and load the snapshot for the visit | ||||
For url = https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz | |||||
>>> find_branch_name(url) | Using the revisions/releases collected at step 5., and the branch | ||||
b'release/8sync-0.2.0' | information from step 0., generate a snapshot and load it into the | ||||
Software Heritage archive | |||||
""" | """ | ||||
branch_name = '' | status_load = 'uneventful' # either: eventful, uneventful, failed | ||||
filename = os.path.basename(url) | status_visit = 'partial' # either: partial, full | ||||
filename_parts = filename.split(".") | tmp_revisions: Dict[str, List] = {} | ||||
if len(filename_parts) > 1 and filename_parts[-2] == 'tar': | |||||
for part in filename_parts[:-2]: | |||||
branch_name += '.' + part | |||||
elif len(filename_parts) > 1 and filename_parts[-1] == 'zip': | |||||
for part in filename_parts[:-1]: | |||||
branch_name += '.' + part | |||||
return (('release/%s') % branch_name[1:]).encode('ascii') | try: | ||||
# Prepare origin and origin_visit | |||||
def find_head(self, branches, branch_name, timestamp): | origin = {'url': self.url} | ||||
"""Make branch head. | self.storage.origin_add([origin]) | ||||
visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | |||||
visit_id = self.storage.origin_visit_add( | |||||
origin=self.url, | |||||
date=visit_date, | |||||
type=self.visit_type)['visit'] | |||||
# Retrieve the default release (the "latest" one) | |||||
default_release = self.get_default_release() | |||||
# FIXME: Add load exceptions handling | |||||
for version in self.get_versions(): # for each | |||||
tmp_revisions[version] = [] | |||||
# `a_` stands for `artifact_` | |||||
for a_filename, a_uri, a_metadata in self.get_artifacts( | |||||
version): | |||||
with tempfile.TemporaryDirectory() as tmpdir: | |||||
# a_c_: archive_computed_ | |||||
a_path, a_c_metadata = self.fetch_artifact_archive( | |||||
a_uri, dest=tmpdir) | |||||
uncompressed_path = os.path.join(tmpdir, 'src') | |||||
uncompress(a_path, dest=uncompressed_path) | |||||
directory = Directory.from_disk( | |||||
path=uncompressed_path.encode('utf-8'), data=True) | |||||
# FIXME: Try not to load the full raw content in memory | |||||
objects = directory.collect() | |||||
Checks if the current version is the latest version. Make it as head | contents = objects['content'].values() | ||||
if it is the latest version. | self.storage.content_add( | ||||
map(content_for_storage, contents)) | |||||
status_load = 'eventful' | |||||
directories = objects['directory'].values() | |||||
self.storage.directory_add(directories) | |||||
# FIXME: This should be release. cf. D409 discussion | |||||
revision = self.build_revision( | |||||
a_metadata, uncompressed_path) | |||||
revision.update({ | |||||
'type': 'tar', | |||||
'synthetic': True, | |||||
'directory': directory.hash, | |||||
}) | |||||
revision['metadata'].update({ | |||||
'original_artifact': a_metadata, | |||||
'hashes_artifact': a_c_metadata | |||||
}) | |||||
Args: | revision['id'] = identifier_to_bytes( | ||||
branches (dict): Branches for the focused package. | revision_identifier(revision)) | ||||
branch_name (str): Branch name | self.storage.revision_add([revision]) | ||||
Returns: | tmp_revisions[version].append({ | ||||
dict: Branches for the focused package | 'filename': a_filename, | ||||
'target': revision['id'], | |||||
}) | |||||
""" | # Build and load the snapshot | ||||
if self.latest_timestamp == int(timestamp): | branches = {} | ||||
for version, v_branches in tmp_revisions.items(): | |||||
if len(v_branches) == 1: | |||||
branch_name = ('releases/%s' % version).encode('utf-8') | |||||
if version == default_release: | |||||
branches[b'HEAD'] = { | branches[b'HEAD'] = { | ||||
'target_type': 'alias', | 'target_type': 'alias', | ||||
'target': branch_name, | 'target': branch_name, | ||||
} | } | ||||
return branches | |||||
def target_from_version(self, revision_id): | branches[branch_name] = { | ||||
return { | |||||
'target': revision_id, | |||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
} if revision_id else None | 'target': v_branches[0]['target'], | ||||
def load_status(self): | |||||
return { | |||||
'status': self._load_status, | |||||
} | } | ||||
else: | |||||
def visit_status(self): | for x in v_branches: | ||||
return self._visit_status | branch_name = ('releases/%s/%s' % ( | ||||
version, v_branches['filename'])).encode('utf-8') | |||||
def cleanup(self): | branches[branch_name] = { | ||||
"""Clean up temporary disk use after downloading and extracting | 'target_type': 'revision', | ||||
package tarballs. | 'target': x['target'], | ||||
} | |||||
""" | snapshot = { | ||||
if self.debug: | 'branches': branches | ||||
self.log.warning('%s Will not clean up temp dir %s' % ( | } | ||||
DEBUG_MODE, self.temp_directory | snapshot['id'] = identifier_to_bytes( | ||||
)) | snapshot_identifier(snapshot)) | ||||
return | self.storage.snapshot_add([snapshot]) | ||||
if os.path.exists(self.temp_directory): | |||||
self.log.debug('Clean up %s' % self.temp_directory) | # come so far, we actually reached a full visit | ||||
shutil.rmtree(self.temp_directory) | status_visit = 'full' | ||||
# Update the visit's state | |||||
self.storage.origin_visit_update( | |||||
origin=self.url, visit_id=visit_id, status=status_visit, | |||||
snapshot=snapshot) | |||||
except ValueError as e: | |||||
logger.warning('Fail to load %s. Reason: %s' % (self.url, e)) | |||||
finally: | |||||
return {'status': status_load} |