Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
- This file was added.
# Copyright (C) 2019 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import os | |||||
import shutil | |||||
from abc import abstractmethod | |||||
from tempfile import mkdtemp | |||||
from swh.core import tarball | |||||
from swh.loader.core.utils import clean_dangling_folders | |||||
from swh.loader.core.loader import BufferedLoader | |||||
from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE | |||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | |||||
from swh.model.from_disk import Directory | |||||
from .revision import constructRevision | |||||
from swh.model.identifiers import ( | |||||
identifier_to_bytes, revision_identifier, snapshot_identifier | |||||
) | |||||
DEBUG_MODE = '** DEBUG MODE **' | |||||
class PackageLoader(BufferedLoader, constructRevision): | |||||
"""Package loader class for package manager loader | |||||
A loader is a component of the Software Heritage architecture responsible | |||||
for reading a source code origin and add new file contents in the object | |||||
storage and repository structure in the storage database. | |||||
The task of loader for package managers somehow similar for all | |||||
of them, notably it includes querying of an API to get metadata and | |||||
retrieval of the package source code and its ingestion into the archive. | |||||
The steps involving in ingestion of the package source code are automated | |||||
by this class which ease up the process of creating of the new loader. | |||||
API quering and obtaining the metadata for a package is done seperately | |||||
for each new loader by overriding the :func:`convert_to_standard_format` | |||||
function. It returns the all the information about the package in a | |||||
specific format. after which all the process involving downloading, | |||||
decompressing, creating and loading snapshots is automated by this class. | |||||
Required Overrides: | |||||
loader_name | |||||
class_name | |||||
def convert_to_standard_format | |||||
Optional Overrides: | |||||
def cleanup_artifact | |||||
def extract_metadata | |||||
""" | |||||
loader_name = None | |||||
"""Package manager name""" # e.g pypi | |||||
class_name = None | |||||
"""Loader class name""" # eg PyPILoader | |||||
def __init__(self): | |||||
self.TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.%s.' % self.loader_name | |||||
super().__init__(logging_class='swh.loader.%s.%s' % (self.loader_name, | |||||
self.class_name)) | |||||
self.local_cache = None | |||||
self.dir_path = None | |||||
temp_directory = self.config['temp_directory'] | |||||
os.makedirs(temp_directory, exist_ok=True) | |||||
self.temp_directory = mkdtemp( | |||||
suffix='-%s' % os.getpid(), | |||||
prefix=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=temp_directory) | |||||
self.debug = self.config.get('debug', False) | |||||
@abstractmethod | |||||
def convert_to_standard_format(self, kwargs): | |||||
"""Fetch the metadata and convert it into a standard format | |||||
This method serve two purpose: | |||||
* Make API call to get the package versions and metadata(if needed) | |||||
* Convert the information received from lister and API call to a | |||||
standard format | |||||
The standard format is a dict with keys | |||||
`name` (str): Holding name of the package | |||||
`origin_url` (str): Holding the origin_url of the package | |||||
`tarballs` (list): A list of dicts where each dict contains | |||||
information related to a single version of the | |||||
package. The `url` key in the dict is necessary and will | |||||
hold tarball url. Other keys are optional and as per | |||||
availability of metadata. | |||||
Note: Keys `nature` and `response` are reserved keywords and cannot be | |||||
used in the dict that are present under key `tarballs` | |||||
Args: | |||||
kwargs (dict): Dict of arbitrary keyword arguments passed by | |||||
the lister. | |||||
Returns: | |||||
dict: Containing information as directed by the guidelines | |||||
mentioned above | |||||
Example: | |||||
{ | |||||
name:'8sync', | |||||
origin_url:'https://ftp.gnu.org/gnu/8sync/', | |||||
tarballs:[{url: 'https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz', | |||||
time_modified: 1562878592 }, | |||||
{url: 'https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz', | |||||
time_modified: 1599887203 }, | |||||
... | |||||
] | |||||
} | |||||
""" | |||||
pass | |||||
def cleanup_artifact(self, uncompressed_path): | |||||
"""Clean up unnecessary files from the downloaded tarball | |||||
also some special operation if needed. | |||||
Implementation of this method depends on the file structure of the | |||||
tarball. It is used to clean up files from the uncompressed tarball | |||||
that are not to be archived(eg binaries). | |||||
Args: | |||||
uncompressed_path (str): Path of uncompressed tarball | |||||
Returns: | |||||
uncompressed_path (str): Path of uncompressed tarball after | |||||
removing unnecessary files | |||||
""" | |||||
return uncompressed_path | |||||
def extract_metadata(self, package_path, package_source_data): | |||||
"""Fetch the metadata from the downloaded file. | |||||
Override this method to perform metadata extraction for each version | |||||
of a package from the uncompressed package by parsing over the file | |||||
containing metadata(package.json, PKG-INFO, ...). | |||||
Add the extracted metadata to the present `package_source_data` which | |||||
contains information related the focused package version. | |||||
Args: | |||||
package_path (str): Uncompressed package version path | |||||
package_source_data (dict): Information about the focused package | |||||
version. | |||||
Returns: | |||||
dict: Updated information about the focused package | |||||
version. | |||||
""" | |||||
return package_source_data | |||||
def prepare_origin_visit(self, *args, **kwargs): | |||||
"""Prepare package visit. | |||||
Args: | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | |||||
# reset statuses | |||||
self._load_status = 'uneventful' | |||||
self._visit_status = 'full' | |||||
self.done = False | |||||
# fetch the package metadata from the registry | |||||
self.package_details = self.convert_to_standard_format(kwargs) | |||||
self.set_origin() | |||||
self.visit_date = None # loader core will populate it | |||||
def set_origin(self): | |||||
"""Assign value to self.origin. | |||||
""" | |||||
self.origin = { | |||||
'url': self.package_details['origin_url'], | |||||
'type': self.loader_name, | |||||
} | |||||
def prepare(self, *args, **kwargs): | |||||
"""Prepare effective loading of source tarballs for a package manager | |||||
package. | |||||
Args: | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | |||||
self.package_contents = [] | |||||
self.package_directories = [] | |||||
self.package_revisions = [] | |||||
self.package_source_data = [] | |||||
self.package_temp_dir = os.path.join(self.temp_directory, | |||||
self.package_details['name']) | |||||
last_snapshot = self.last_snapshot() | |||||
self.known_versions = self.get_known_versions(last_snapshot) | |||||
self.new_versions = \ | |||||
self.prepare_package_versions(self.package_details['tarballs'], | |||||
self.known_versions) | |||||
def last_snapshot(self): | |||||
"""Retrieve the last snapshot of the package if any. | |||||
""" | |||||
visit = self.storage.origin_visit_get_latest( | |||||
self.origin['url'], require_snapshot=True) | |||||
if visit: | |||||
return snapshot_get_all_branches(self.storage, visit['snapshot']) | |||||
def _prepare_package_version(self, package_source_data, tarball_request): | |||||
"""Process the package release version. | |||||
The following operations are performed: | |||||
1. Download the tarball | |||||
2. Uncompress the tarball | |||||
3. Delete unnecessary files (optional) | |||||
4. Parse the file associated to the package version to extract | |||||
metadata (optional) | |||||
Args: | |||||
package_source_data (dict): containing information | |||||
about the focused package version. | |||||
known_versions (dict): may be provided by the loader, it enables | |||||
to filter out versions already ingested in the archive. | |||||
Return: | |||||
Tuple[dict, str]: tuples containing the following | |||||
members: | |||||
* a dict holding package tarball information and metadata | |||||
* a string holding the path of the uncompressed package to | |||||
load into the archive | |||||
""" | |||||
url = package_source_data['url'] | |||||
tarball_path, hashes = self.download_generate_hash(tarball_request, | |||||
url) | |||||
uncompressed_path = os.path.join(self.package_temp_dir, 'uncompressed', | |||||
os.path.basename(url)) # SEE ME | |||||
package_source_data['nature'] = self.uncompress_tarball( | |||||
tarball_path, uncompressed_path) | |||||
# remove tarball | |||||
os.remove(tarball_path) | |||||
if self.tarball_invalid: | |||||
return None, None | |||||
package_path = self.cleanup_artifact(uncompressed_path) | |||||
package_source_data = self.extract_metadata(package_path, | |||||
package_source_data) | |||||
self.package_source_data.append(package_source_data) | |||||
return package_source_data, package_path | |||||
def download_generate_hash(self, response, url): | |||||
"""Store file in temp directory and computes hash of its filepath. | |||||
Args: | |||||
response (Response): Server response of the url | |||||
url (str): Url of the tarball | |||||
Returns: | |||||
Tuple of local (filepath, hashes of filepath) | |||||
""" | |||||
length = int(response.headers['content-length']) | |||||
os.makedirs(self.package_temp_dir, exist_ok=True) | |||||
# SEE ME | |||||
filepath = os.path.join(self.package_temp_dir, os.path.basename(url)) | |||||
h = self.write_file(filepath, length, response) | |||||
self.check_file(filepath, length) | |||||
hashes = { | |||||
'length': length, | |||||
**h.hexdigest() | |||||
} | |||||
return filepath, hashes | |||||
def write_file(self, filepath, length, response): | |||||
"""Convert the server response to a file. | |||||
""" | |||||
h = MultiHash(length=length) | |||||
with open(filepath, 'wb') as f: | |||||
for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): | |||||
h.update(chunk) | |||||
f.write(chunk) | |||||
return h | |||||
def check_file(self, filepath, length): | |||||
"""Check for the validity of the tarball downloaded. | |||||
""" | |||||
actual_length = os.path.getsize(filepath) | |||||
if length != actual_length: | |||||
raise ValueError('Error when checking size: %s != %s' % ( | |||||
length, actual_length)) | |||||
def uncompress_tarball(self, filepath, path): | |||||
"""Uncompress a tarball. | |||||
Args: | |||||
filepath (str): Path of tarball to uncompress | |||||
path (str): The destination folder where to uncompress the tarball | |||||
Returns: | |||||
The nature of the tarball, zip or tar. | |||||
""" | |||||
# filepath = tempdir + url | |||||
try: | |||||
self.tarball_invalid = False | |||||
return tarball.uncompress(filepath, path) | |||||
except Exception: | |||||
self.tarball_invalid = True | |||||
return None | |||||
def fetch_data(self): | |||||
"""Called once per release artifact version (can be many for one | |||||
release). | |||||
This will for each call: | |||||
- retrieve a release artifact (associated to a release version) | |||||
- Computes the swh objects | |||||
Returns: | |||||
True as long as data to fetch exist | |||||
""" | |||||
data = None | |||||
if self.done: | |||||
return False | |||||
try: | |||||
data = next(self.new_versions) | |||||
self._load_status = 'eventful' | |||||
except StopIteration: | |||||
self.done = True | |||||
return False | |||||
package_source_data, dir_path = data | |||||
# package release tarball was corrupted | |||||
if self.tarball_invalid: | |||||
return not self.done | |||||
dir_path = dir_path.encode('utf-8') | |||||
directory = Directory.from_disk(path=dir_path, data=True) | |||||
objects = directory.collect() | |||||
objects = self.check_objects(objects) | |||||
self.package_contents = objects['content'].values() | |||||
self.package_directories = objects['directory'].values() | |||||
revision = self.compute_revision(directory, | |||||
package_source_data) | |||||
revision['id'] = identifier_to_bytes( | |||||
revision_identifier(revision)) | |||||
self.package_revisions.append(revision) | |||||
self.update_known_version(package_source_data, revision['id']) | |||||
self.log.debug('Removing unpacked package files at %s', dir_path) | |||||
shutil.rmtree(dir_path) | |||||
return not self.done | |||||
def update_known_version(self, package_source_data, revision_id): | |||||
"""Update the `known_versions` variable with new discovered versions | |||||
Args: | |||||
package_source_data (dict): Metadata available for a particular package | |||||
version | |||||
revision_id (str): Revision id of the revsion of focused package | |||||
version | |||||
""" | |||||
key = self.get_key() | |||||
package_key = package_source_data[key] | |||||
self.known_versions[package_key] = revision_id # SEE ME | |||||
def check_objects(self, objects): | |||||
"""The the object for necessary fields and initialise it if not | |||||
already present. | |||||
""" | |||||
if 'content' not in objects: | |||||
objects['content'] = {} | |||||
if 'directory' not in objects: | |||||
objects['directory'] = {} | |||||
return objects | |||||
def store_data(self): | |||||
"""Store fetched data in the database. | |||||
""" | |||||
self.maybe_load_contents(self.package_contents) | |||||
self.maybe_load_directories(self.package_directories) | |||||
self.maybe_load_revisions(self.package_revisions) | |||||
if self.done: | |||||
self.generate_and_load_snapshot() | |||||
self.flush() | |||||
def generate_and_load_snapshot(self): | |||||
"""Generate and load snapshot for the package visit. | |||||
""" | |||||
snapshot = { | |||||
'branches': self.generate_branches(), | |||||
} | |||||
snapshot['id'] = identifier_to_bytes(snapshot_identifier(snapshot)) | |||||
self.maybe_load_snapshot(snapshot) | |||||
def generate_branches(self): | |||||
"""Generate branches for the focused package | |||||
""" | |||||
branches = {} | |||||
key = self.get_key() | |||||
for version in self.package_source_data: | |||||
branch_name = self.branch_name(version) | |||||
target = self.target_from_version(version[key]) | |||||
branches[branch_name] = target | |||||
branches = self.find_head(branches, branch_name) | |||||
# How to find HEAD and branch name? | |||||
if not target: | |||||
self.package_visit_status = 'partial' | |||||
return branches | |||||
def find_head(self, branches, branch_name): | |||||
"""Make branch head. | |||||
Checks if the current version is the latest version. Make it as head | |||||
if it is the latest version. | |||||
Args: | |||||
branches (dict): Branches for the focused package. | |||||
branch_name (str): Branch name | |||||
Returns: | |||||
dict: Branches for the focused package | |||||
""" | |||||
if True: # I don't know what to do here | |||||
# we need some condition here to check if the version is the | |||||
# latest version. I don't know how to check that because all | |||||
# of the package manager do not provide field like version | |||||
branches[b'HEAD'] = { | |||||
'target_type': 'alias', | |||||
'target': branch_name, | |||||
} | |||||
return branches | |||||
def branch_name(self, version): | |||||
"""Find branch name. | |||||
Args: | |||||
version (dict): Information related to a particular package version | |||||
Returns: | |||||
Branch name encoded in ascii | |||||
""" | |||||
# How to tackle this | |||||
pass | |||||
def target_from_version(self, key_value): | |||||
target = self.known_versions.get(key_value) | |||||
return { | |||||
'target': target, | |||||
'target_type': 'revision', | |||||
} if target else None | |||||
def pre_cleanup(self): | |||||
"""To prevent disk explosion if some other workers exploded | |||||
in mid-air (OOM killed), we try and clean up dangling files. | |||||
""" | |||||
if self.debug: | |||||
self.log.warn('%s Will not pre-clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
clean_dangling_folders(self.config['temp_directory'], | |||||
pattern_check=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
log=self.log) | |||||
def cleanup(self): | |||||
"""Clean up temporary disk use after downloading and extracting | |||||
package tarballs. | |||||
""" | |||||
if self.debug: | |||||
self.log.warn('%s Will not clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
if os.path.exists(self.temp_directory): | |||||
self.log.debug('Clean up %s' % self.temp_directory) | |||||
shutil.rmtree(self.temp_directory) |