Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7147901
D1824.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D1824.diff
View Options
diff --git a/swh/loader/package/__init__.py b/swh/loader/package/__init__.py
new file mode 100644
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/package/loader.py
@@ -0,0 +1,475 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+import shutil
+import requests
+
+try:
+ from _version import __version__
+except ImportError:
+ __version__ = 'devel'
+
+
+from tempfile import mkdtemp
+
+from swh.core import tarball
+from swh.loader.core.utils import clean_dangling_folders
+from swh.loader.core.loader import BufferedLoader
+from swh.model.identifiers import normalize_timestamp
+from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE
+from swh.model.from_disk import Directory
+
+from swh.model.identifiers import (
+ identifier_to_bytes, revision_identifier, snapshot_identifier
+)
+
+DEBUG_MODE = '** DEBUG MODE **'
+
+
+class GNULoader(BufferedLoader):
+
+ SWH_PERSON = {
+ 'name': b'Software Heritage',
+ 'fullname': b'Software Heritage',
+ 'email': b'robot@softwareheritage.org'
+ }
+ REVISION_MESSAGE = b'swh-loader-package: synthetic revision message'
+
+ def __init__(self):
+ self.TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.gnu.'
+ super().__init__(logging_class='swh.loader.package.GNULoader')
+
+ self.dir_path = None
+ temp_directory = self.config['temp_directory']
+ os.makedirs(temp_directory, exist_ok=True)
+
+ self.temp_directory = mkdtemp(
+ suffix='-%s' % os.getpid(),
+ prefix=self.TEMPORARY_DIR_PREFIX_PATTERN,
+ dir=temp_directory)
+
+ self.debug = self.config.get('debug', False)
+ self.session = requests.session()
+ self.params = {
+ 'headers': {
+ 'User-Agent': 'Software Heritage Loader (%s)' % (
+ __version__
+ )
+ }
+ }
+
+ def pre_cleanup(self):
+ """To prevent disk explosion if some other workers exploded
+ in mid-air (OOM killed), we try and clean up dangling files.
+
+ """
+ if self.debug:
+ self.log.warn('%s Will not pre-clean up temp dir %s' % (
+ DEBUG_MODE, self.temp_directory
+ ))
+ return
+ clean_dangling_folders(self.temp_directory,
+ pattern_check=self.TEMPORARY_DIR_PREFIX_PATTERN,
+ log=self.log)
+
+ def prepare_origin_visit(self, name, origin_url, **kwargs):
+ """Prepare package visit.
+
+ Args:
+ name (str): Package Name
+ origin_url (str): Package origin url
+ **kwargs: Arbitrary keyword arguments passed by the lister.
+
+ """
+ # reset statuses
+ self._load_status = 'uneventful'
+ self._visit_status = 'full'
+ self.done = False
+
+ self.origin = {
+ 'url': origin_url,
+ 'type': 'gnu',
+ }
+
+ self.visit_date = None # loader core will populate it
+
+ def prepare(self, name, origin_url, **kwargs):
+ """Prepare effective loading of source tarballs for a package manager
+ package.
+
+ Args:
+ name (str): Package Name
+ origin_url (str): Package origin url
+ **kwargs: Arbitrary keyword arguments passed by the lister.
+
+ """
+ self.package_contents = []
+ self.package_directories = []
+ self.package_revisions = []
+ self.all_version_data = []
+ self.latest_timestamp = 0
+ # Conceled the data into one dictionary to eleminate the need of
+ # passing all the paramaters when required in some method
+ self.package_details = {
+ 'name': name,
+ 'origin_url': origin_url,
+ 'tarballs': kwargs['tarballs'],
+ }
+
+ self.package_temp_dir = os.path.join(self.temp_directory,
+ self.package_details['name'])
+
+ self.new_versions = \
+ self.prepare_package_versions(self.package_details['tarballs'])
+
+ def prepare_package_versions(self, tarballs):
+ """
+ Instantiate a generator that will process a specific package release
+ version at each iteration step. The following operations will be
+ performed:
+
+ 1. Create a temporary directory to download and extract the
+ release tarball.
+ 2. Download the tarball.
+ 3. Uncompress the tarball.
+ 4. Parse the file associated to the package version to extract
+ metadata (optional).
+ 5. Delete unnecessary files (optional).
+
+ Args:
+ tarballs (list): a list of dicts containing information about the
+ respective tarball that is provided by lister.
+ known_versions (dict): may be provided by the loader, it enables
+ to filter out versions already ingested in the archive.
+
+ Yields:
+ Tuple[dict, str]: tuples containing the following
+ members:
+
+ * a dict holding package tarball information and metadata
+ * a string holding the path of the uncompressed package to
+ load into the archive
+
+ """
+ for package_version_data in tarballs:
+
+ tarball_url = package_version_data['archive']
+ tarball_request = self._request(tarball_url,
+ throw_error=False)
+ if tarball_request.status_code == 404:
+ self.log.warn('Tarball url %s returns a 404 error.',
+ tarball_url)
+ self._visit_status = 'partial'
+ # FIX ME: Do we need to mark it `partial` here
+ continue
+
+ yield self._prepare_package_version(package_version_data,
+ tarball_request)
+
+ def _request(self, url, throw_error=True):
+ """Request the remote tarball url.
+
+ Args:
+ url (str): Url (file or http*).
+
+ Raises:
+ ValueError in case of failing to query.
+
+ Returns:
+ Tuple of local (filepath, hashes of filepath).
+
+ """
+ response = self.session.get(url, **self.params, stream=True)
+ if response.status_code != 200 and throw_error:
+ raise ValueError("Fail to query '%s'. Reason: %s" % (
+ url, response.status_code))
+
+ return response
+
+ def _prepare_package_version(self, package_version_data, tarball_request):
+ """Process the package release version.
+
+ The following operations are performed:
+
+ 1. Download the tarball
+ 2. Uncompress the tarball
+ 3. Delete unnecessary files (optional)
+ 4. Parse the file associated to the package version to extract
+ metadata (optional)
+
+ Args:
+ package_version_data (dict): containing information
+ about the focused package version.
+ known_versions (dict): may be provided by the loader, it enables
+ to filter out versions already ingested in the archive.
+
+ Return:
+ Tuple[dict, str]: tuples containing the following
+ members:
+
+ * a dict holding package tarball information and metadata
+ * a string holding the path of the uncompressed package to
+ load into the archive
+
+ """
+ url = package_version_data['archive']
+ tarball_path, hashes = self.download_generate_hash(tarball_request,
+ url)
+ uncompressed_path = os.path.join(self.package_temp_dir, 'uncompressed',
+ os.path.basename(url)) # SEE ME
+ self.uncompress_tarball(tarball_path, uncompressed_path)
+
+ # remove tarball
+ os.remove(tarball_path)
+
+ if self.tarball_invalid:
+ return None, None
+
+ return package_version_data, uncompressed_path
+
+ def download_generate_hash(self, response, url):
+ """Store file in temp directory and computes hash of its filepath.
+
+ Args:
+ response (Response): Server response of the url
+ url (str): Url of the tarball
+
+ Returns:
+ Tuple of local (filepath, hashes of filepath)
+
+ """
+ length = int(response.headers['content-length'])
+ os.makedirs(self.package_temp_dir, exist_ok=True)
+ # SEE ME
+ filepath = os.path.join(self.package_temp_dir, os.path.basename(url))
+
+ # Convert the server response to a file.
+ h = MultiHash(length=length)
+ with open(filepath, 'wb') as f:
+ for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE):
+ h.update(chunk)
+ f.write(chunk)
+
+ # Check for the validity of the tarball downloaded.
+ actual_length = os.path.getsize(filepath)
+ if length != actual_length:
+ raise ValueError('Error when checking size: %s != %s' % (
+ length, actual_length))
+
+ hashes = {
+ 'length': length,
+ **h.hexdigest()
+ }
+ return filepath, hashes
+
+ def uncompress_tarball(self, filepath, path):
+ """Uncompress a tarball.
+
+ Args:
+ filepath (str): Path of tarball to uncompress
+ path (str): The destination folder where to uncompress the tarball
+ Returns:
+ The nature of the tarball, zip or tar.
+
+ """
+ try:
+ self.tarball_invalid = False
+ tarball.uncompress(filepath, path)
+ except Exception:
+ self.tarball_invalid = True
+ self._visit_status = 'partial'
+
+ def fetch_data(self):
+ """Called once per release artifact version (can be many for one
+ release).
+
+ This will for each call:
+ - retrieve a release artifact (associated to a release version)
+ - Computes the swh objects
+
+ Returns:
+ True as long as data to fetch exist
+
+ """
+ data = None
+ if self.done:
+ return False
+
+ try:
+ data = next(self.new_versions)
+ self._load_status = 'eventful'
+ except StopIteration:
+ self.done = True
+ return False
+
+ package_version_data, dir_path = data
+
+ # package release tarball was corrupted
+ if self.tarball_invalid:
+ return not self.done
+
+ dir_path = dir_path.encode('utf-8')
+ directory = Directory.from_disk(path=dir_path, data=True)
+ objects = directory.collect()
+
+ if 'content' not in objects:
+ objects['content'] = {}
+ if 'directory' not in objects:
+ objects['directory'] = {}
+
+ self.package_contents = objects['content'].values()
+ self.package_directories = objects['directory'].values()
+
+ revision = self.build_revision(directory,
+ package_version_data)
+
+ revision['id'] = identifier_to_bytes(
+ revision_identifier(revision))
+ self.package_revisions.append(revision)
+ self.log.debug(revision)
+ package_version_data['id'] = revision['id']
+ self.all_version_data.append(package_version_data)
+
+ # To find the latest version
+ if self.latest_timestamp < int(package_version_data['date']):
+ self.latest_timestamp = int(package_version_data['date'])
+
+ self.log.debug('Removing unpacked package files at %s', dir_path)
+ shutil.rmtree(dir_path)
+
+ return not self.done
+
+ def build_revision(self, directory, package_version_data):
+ normalize_date = normalize_timestamp(int(package_version_data['date']))
+ return {
+ 'metadata': {
+ 'package': {
+ 'date': package_version_data['date'],
+ 'archive': package_version_data['archive'],
+ },
+ },
+ 'date': normalize_date,
+ 'committer_date': normalize_date,
+ 'author': self.SWH_PERSON,
+ 'committer': self.SWH_PERSON,
+ 'type': 'tar',
+ 'message': self.REVISION_MESSAGE,
+ 'directory': directory.hash,
+ 'synthetic': True,
+ 'parents': [],
+ }
+
+ def store_data(self):
+ """Store fetched data in the database.
+
+ """
+ self.maybe_load_contents(self.package_contents)
+ self.maybe_load_directories(self.package_directories)
+ self.maybe_load_revisions(self.package_revisions)
+
+ if self.done:
+ self.generate_and_load_snapshot()
+ self.flush()
+
+ def generate_and_load_snapshot(self):
+ """Generate and load snapshot for the package visit.
+
+ """
+ branches = {}
+ for version_data in self.all_version_data:
+ branch_name = self.find_branch_name(version_data['archive'])
+
+ target = self.target_from_version(version_data['id'])
+ branches[branch_name] = target
+ branches = self.find_head(branches, branch_name,
+ version_data['date'])
+
+ if not target:
+ self._visit_status = 'partial'
+
+ snapshot = {
+ 'branches': branches,
+ }
+
+ snapshot['id'] = identifier_to_bytes(snapshot_identifier(snapshot))
+ self.maybe_load_snapshot(snapshot)
+
+ def find_branch_name(self, url):
+ """Extract branch name from tarball url
+
+ Args:
+ url (str): Tarball URL
+
+ Returns:
+ byte: Branch name
+
+ Example:
+ For url = https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz
+
+ >>> find_branch_name(url)
+ b'release/8sync-0.2.0'
+
+ """
+ branch_name = ''
+ filename = os.path.basename(url)
+ filename_parts = filename.split(".")
+ if len(filename_parts) > 1 and filename_parts[-2] == 'tar':
+ for part in filename_parts[:-2]:
+ branch_name += '.' + part
+ elif len(filename_parts) > 1 and filename_parts[-1] == 'zip':
+ for part in filename_parts[:-1]:
+ branch_name += '.' + part
+
+ return (('release/%s') % branch_name[1:]).encode('ascii')
+
+ def find_head(self, branches, branch_name, timestamp):
+ """Make branch head.
+
+ Checks if the current version is the latest version. Make it as head
+ if it is the latest version.
+
+ Args:
+ branches (dict): Branches for the focused package.
+ branch_name (str): Branch name
+
+ Returns:
+ dict: Branches for the focused package
+
+ """
+ if self.latest_timestamp == int(timestamp):
+ branches[b'HEAD'] = {
+ 'target_type': 'alias',
+ 'target': branch_name,
+ }
+ return branches
+
+ def target_from_version(self, revision_id):
+ return {
+ 'target': revision_id,
+ 'target_type': 'revision',
+ } if revision_id else None
+
+ def load_status(self):
+ return {
+ 'status': self._load_status,
+ }
+
+ def visit_status(self):
+ return self._visit_status
+
+ def cleanup(self):
+ """Clean up temporary disk use after downloading and extracting
+ package tarballs.
+
+ """
+ if self.debug:
+ self.log.warn('%s Will not clean up temp dir %s' % (
+ DEBUG_MODE, self.temp_directory
+ ))
+ return
+ if os.path.exists(self.temp_directory):
+ self.log.debug('Clean up %s' % self.temp_directory)
+ shutil.rmtree(self.temp_directory)
diff --git a/swh/loader/package/tasks.py b/swh/loader/package/tasks.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/package/tasks.py
@@ -0,0 +1,13 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from celery import current_app as app
+from swh.loader.package.loader import GNULoader
+
+
+@app.task(name=__name__ + '.LoadGNU')
+def load_gnu(name, origin_url=None, tarballs=None):
+ return GNULoader().load(name, origin_url,
+ tarballs=tarballs)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 23, 2:47 AM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218019
Attached To
D1824: swh.loader.package: Implement GNU Loader
Event Timeline
Log In to Comment