Changeset View
Standalone View
swh/loader/package/loader.py
- This file was added.
# Copyright (C) 2019 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import os | |||||
import shutil | |||||
import requests | |||||
try: | |||||
from _version import __version__ | |||||
except ImportError: | |||||
__version__ = 'devel' | |||||
from tempfile import mkdtemp | |||||
from swh.core import tarball | |||||
from swh.loader.core.utils import clean_dangling_folders | |||||
from swh.loader.core.loader import BufferedLoader | |||||
from swh.model.identifiers import normalize_timestamp | |||||
from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE | |||||
from swh.model.from_disk import Directory | |||||
from swh.model.identifiers import ( | |||||
identifier_to_bytes, revision_identifier, snapshot_identifier | |||||
) | |||||
DEBUG_MODE = '** DEBUG MODE **' | |||||
class GNULoader(BufferedLoader): | |||||
SWH_PERSON = { | |||||
'name': b'Software Heritage', | |||||
'fullname': b'Software Heritage', | |||||
'email': b'robot@softwareheritage.org' | |||||
} | |||||
REVISION_MESSAGE = b'swh-loader-package: synthetic revision message' | |||||
def __init__(self): | |||||
self.TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.gnu.' | |||||
super().__init__(logging_class='swh.loader.package.GNULoader') | |||||
self.dir_path = None | |||||
temp_directory = self.config['temp_directory'] | |||||
nahimilega: This is a problem it should be temp_directory = self.config['temp_directory']
But while… | |||||
Done Inline ActionsWhen you're running the worker, is the temp_directory key set in the config? If not then you need to add it. olasd: When you're running the worker, is the `temp_directory` key set in the config? If not then you… | |||||
os.makedirs(temp_directory, exist_ok=True) | |||||
Done Inline ActionsPlease remove as it's unused. ardumont: Please remove as it's unused.
| |||||
self.temp_directory = mkdtemp( | |||||
suffix='-%s' % os.getpid(), | |||||
prefix=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=temp_directory) | |||||
Done Inline ActionsThose needs to be cleaned up: T1532 (resulting from multiple team discussions). I know @anlambert said otherwise but still, that task explains it all. You can however reference the needed configuration as comments. ardumont: Those needs to be cleaned up: T1532 (resulting from multiple team discussions).
I know… | |||||
self.debug = self.config.get('debug', False) | |||||
Done Inline Actionsthat comes from self.config['temp_directory'] ardumont: that comes from self.config['temp_directory'] | |||||
self.session = requests.session() | |||||
self.params = { | |||||
'headers': { | |||||
'User-Agent': 'Software Heritage Loader (%s)' % ( | |||||
__version__ | |||||
) | |||||
} | |||||
} | |||||
def pre_cleanup(self): | |||||
"""To prevent disk explosion if some other workers exploded | |||||
in mid-air (OOM killed), we try and clean up dangling files. | |||||
""" | |||||
if self.debug: | |||||
self.log.warn('%s Will not pre-clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
clean_dangling_folders(self.temp_directory, | |||||
pattern_check=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
log=self.log) | |||||
def prepare_origin_visit(self, name, origin_url, **kwargs): | |||||
"""Prepare package visit. | |||||
Args: | |||||
name (str): Package Name | |||||
origin_url (str): Package origin url | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | |||||
# reset statuses | |||||
self._load_status = 'uneventful' | |||||
self._visit_status = 'full' | |||||
self.done = False | |||||
self.origin = { | |||||
'url': origin_url, | |||||
'type': 'gnu', | |||||
} | |||||
self.visit_date = None # loader core will populate it | |||||
def prepare(self, name, origin_url, **kwargs): | |||||
"""Prepare effective loading of source tarballs for a package manager | |||||
package. | |||||
Args: | |||||
name (str): Package Name | |||||
origin_url (str): Package origin url | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | |||||
self.package_contents = [] | |||||
self.package_directories = [] | |||||
self.package_revisions = [] | |||||
self.all_version_data = [] | |||||
self.latest_timestamp = 0 | |||||
# Conceled the data into one dictionary to eleminate the need of | |||||
# passing all the paramaters when required in some method | |||||
self.package_details = { | |||||
'name': name, | |||||
'origin_url': origin_url, | |||||
'tarballs': kwargs['tarballs'], | |||||
} | |||||
self.package_temp_dir = os.path.join(self.temp_directory, | |||||
self.package_details['name']) | |||||
self.new_versions = \ | |||||
self.prepare_package_versions(self.package_details['tarballs']) | |||||
Done Inline ActionsPlease stop using indirection, it's hard to follow through in plain text. When the need to refactor into submethods arise, let's do it, not before. ardumont: Please stop using indirection, it's hard to follow through in plain text.
Be simple in your… | |||||
Done Inline ActionsI converted this into a separate method to ease up in the understanding of this step. Although I will shift the working of this method in prepare() and put an inline comment there. nahimilega: I converted this into a separate method to ease up in the understanding of this step. Although… | |||||
def prepare_package_versions(self, tarballs): | |||||
""" | |||||
Done Inline ActionsPlease, use some syntax checker (flycheck, flyspell, etc...) Concealed this data into one dictionary to eliminate... Also, more generally, when in a submethod, please use the docstring to mention those. ardumont: Please, use some syntax checker (flycheck, flyspell, etc...)
`Concealed this data into one… | |||||
Instantiate a generator that will process a specific package release | |||||
version at each iteration step. The following operations will be | |||||
performed: | |||||
1. Create a temporary directory to download and extract the | |||||
Done Inline Actionsself.package_details = { 'name': name, 'origin_url': origin_url, 'tarballs': kwargs['tarballs'], } Thus really inline this in the caller method. ardumont: ```
self.package_details = {
'name': name,
'origin_url': origin_url,
'tarballs'… | |||||
release tarball. | |||||
2. Download the tarball. | |||||
3. Uncompress the tarball. | |||||
4. Parse the file associated to the package version to extract | |||||
metadata (optional). | |||||
5. Delete unnecessary files (optional). | |||||
Args: | |||||
tarballs (list): a list of dicts containing information about the | |||||
respective tarball that is provided by lister. | |||||
known_versions (dict): may be provided by the loader, it enables | |||||
to filter out versions already ingested in the archive. | |||||
Yields: | |||||
Tuple[dict, str]: tuples containing the following | |||||
members: | |||||
* a dict holding package tarball information and metadata | |||||
* a string holding the path of the uncompressed package to | |||||
load into the archive | |||||
""" | |||||
for package_version_data in tarballs: | |||||
tarball_url = package_version_data['archive'] | |||||
tarball_request = self._request(tarball_url, | |||||
throw_error=False) | |||||
if tarball_request.status_code == 404: | |||||
self.log.warn('Tarball url %s returns a 404 error.', | |||||
tarball_url) | |||||
self._visit_status = 'partial' | |||||
# FIX ME: Do we need to mark it `partial` here | |||||
continue | |||||
yield self._prepare_package_version(package_version_data, | |||||
tarball_request) | |||||
Done Inline ActionsMake that a warn instead. By the way, i'm not sure we want to make that a partial visit [1] [1] If an archive listed is 404, then we most probably never recover it. ardumont: Make that a `warn` instead.
That's more critical, all the more reason since it triggers a… | |||||
Not Done Inline ActionsI have added a FIX ME here to address this issue ;) nahimilega: I have added a `FIX ME` here to address this issue ;) | |||||
def _request(self, url, throw_error=True): | |||||
"""Request the remote tarball url. | |||||
Args: | |||||
url (str): Url (file or http*). | |||||
Raises: | |||||
ValueError in case of failing to query. | |||||
Returns: | |||||
Tuple of local (filepath, hashes of filepath). | |||||
""" | |||||
response = self.session.get(url, **self.params, stream=True) | |||||
if response.status_code != 200 and throw_error: | |||||
raise ValueError("Fail to query '%s'. Reason: %s" % ( | |||||
url, response.status_code)) | |||||
return response | |||||
def _prepare_package_version(self, package_version_data, tarball_request): | |||||
"""Process the package release version. | |||||
The following operations are performed: | |||||
1. Download the tarball | |||||
2. Uncompress the tarball | |||||
3. Delete unnecessary files (optional) | |||||
4. Parse the file associated to the package version to extract | |||||
metadata (optional) | |||||
Args: | |||||
package_version_data (dict): containing information | |||||
about the focused package version. | |||||
known_versions (dict): may be provided by the loader, it enables | |||||
to filter out versions already ingested in the archive. | |||||
Return: | |||||
Tuple[dict, str]: tuples containing the following | |||||
members: | |||||
* a dict holding package tarball information and metadata | |||||
* a string holding the path of the uncompressed package to | |||||
load into the archive | |||||
""" | |||||
url = package_version_data['archive'] | |||||
tarball_path, hashes = self.download_generate_hash(tarball_request, | |||||
url) | |||||
uncompressed_path = os.path.join(self.package_temp_dir, 'uncompressed', | |||||
os.path.basename(url)) # SEE ME | |||||
self.uncompress_tarball(tarball_path, uncompressed_path) | |||||
# remove tarball | |||||
os.remove(tarball_path) | |||||
if self.tarball_invalid: | |||||
return None, None | |||||
return package_version_data, uncompressed_path | |||||
Done Inline ActionsI don't think we want to keep 'nature' key (which is not that great of a name, my bad). IIRC, it's the tarball's type 'tar' or 'zip' here. ardumont: I don't think we want to keep 'nature' key (which is not that great of a name, my bad).
IIRC… | |||||
def download_generate_hash(self, response, url): | |||||
"""Store file in temp directory and computes hash of its filepath. | |||||
Args: | |||||
response (Response): Server response of the url | |||||
url (str): Url of the tarball | |||||
Returns: | |||||
Tuple of local (filepath, hashes of filepath) | |||||
""" | |||||
length = int(response.headers['content-length']) | |||||
os.makedirs(self.package_temp_dir, exist_ok=True) | |||||
# SEE ME | |||||
filepath = os.path.join(self.package_temp_dir, os.path.basename(url)) | |||||
# Convert the server response to a file. | |||||
h = MultiHash(length=length) | |||||
with open(filepath, 'wb') as f: | |||||
for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): | |||||
h.update(chunk) | |||||
f.write(chunk) | |||||
# Check for the validity of the tarball downloaded. | |||||
actual_length = os.path.getsize(filepath) | |||||
if length != actual_length: | |||||
raise ValueError('Error when checking size: %s != %s' % ( | |||||
length, actual_length)) | |||||
hashes = { | |||||
'length': length, | |||||
**h.hexdigest() | |||||
} | |||||
return filepath, hashes | |||||
def uncompress_tarball(self, filepath, path): | |||||
"""Uncompress a tarball. | |||||
Args: | |||||
filepath (str): Path of tarball to uncompress | |||||
path (str): The destination folder where to uncompress the tarball | |||||
Returns: | |||||
The nature of the tarball, zip or tar. | |||||
""" | |||||
try: | |||||
self.tarball_invalid = False | |||||
tarball.uncompress(filepath, path) | |||||
except Exception: | |||||
self.tarball_invalid = True | |||||
self._visit_status = 'partial' | |||||
def fetch_data(self): | |||||
"""Called once per release artifact version (can be many for one | |||||
Done Inline ActionsRemove that comment. If it's really necessary it's not in the right place. ardumont: Remove that comment.
If it's really necessary it's not in the right place. | |||||
release). | |||||
This will for each call: | |||||
- retrieve a release artifact (associated to a release version) | |||||
- Computes the swh objects | |||||
Returns: | |||||
Done Inline ActionsPrefer one return statement. Also a warn log statement when detecting the invalid tarball. uncompressed_path = None self.tarball_invalid = False try: ... uncompressed_path = tarball.uncompress... except: self.log.warn('Invalid tarball detected...') self.tarball_invalid return path ardumont: Prefer one return statement. Also a warn log statement when detecting the invalid tarball.
```… | |||||
True as long as data to fetch exist | |||||
""" | |||||
data = None | |||||
if self.done: | |||||
return False | |||||
try: | |||||
data = next(self.new_versions) | |||||
self._load_status = 'eventful' | |||||
except StopIteration: | |||||
self.done = True | |||||
return False | |||||
package_version_data, dir_path = data | |||||
# package release tarball was corrupted | |||||
if self.tarball_invalid: | |||||
return not self.done | |||||
dir_path = dir_path.encode('utf-8') | |||||
directory = Directory.from_disk(path=dir_path, data=True) | |||||
objects = directory.collect() | |||||
if 'content' not in objects: | |||||
objects['content'] = {} | |||||
if 'directory' not in objects: | |||||
objects['directory'] = {} | |||||
self.package_contents = objects['content'].values() | |||||
self.package_directories = objects['directory'].values() | |||||
revision = self.build_revision(directory, | |||||
package_version_data) | |||||
revision['id'] = identifier_to_bytes( | |||||
revision_identifier(revision)) | |||||
self.package_revisions.append(revision) | |||||
self.log.debug(revision) | |||||
package_version_data['id'] = revision['id'] | |||||
self.all_version_data.append(package_version_data) | |||||
# To find the latest version | |||||
if self.latest_timestamp < int(package_version_data['date']): | |||||
self.latest_timestamp = int(package_version_data['date']) | |||||
self.log.debug('Removing unpacked package files at %s', dir_path) | |||||
shutil.rmtree(dir_path) | |||||
return not self.done | |||||
def build_revision(self, directory, package_version_data): | |||||
normalize_date = normalize_timestamp(int(package_version_data['date'])) | |||||
return { | |||||
'metadata': { | |||||
'package': { | |||||
'date': package_version_data['date'], | |||||
'archive': package_version_data['archive'], | |||||
}, | |||||
}, | |||||
'date': normalize_date, | |||||
'committer_date': normalize_date, | |||||
'author': self.SWH_PERSON, | |||||
'committer': self.SWH_PERSON, | |||||
'type': 'tar', | |||||
'message': self.REVISION_MESSAGE, | |||||
'directory': directory.hash, | |||||
'synthetic': True, | |||||
'parents': [], | |||||
} | |||||
Done Inline ActionsMake that 'tar'. ardumont: Make that 'tar'.
We don't want to see 'zip' or something else here. | |||||
def store_data(self): | |||||
"""Store fetched data in the database. | |||||
""" | |||||
Not Done Inline ActionsI'm not sure about this not being populated. @olasd when you have a sec, what do you think? That could be iterated over in a future iteration step, so i'm not blocking the diff for it ardumont: I'm not sure about this not being populated.
For information that's the revision parent list. | |||||
Not Done Inline Actions@anlambert i should have asked you also ^^ ardumont: @anlambert i should have asked you also ^^ | |||||
Not Done Inline Actions@ardumont, currently we do not populate the parents field in the PyPI and npm loader. We simply create a revision for each package version and reference them in a snapshot each time we visit the package. Nevertheless, maybe it could be interesting to populate it with the revision pointing to the previous package version. This is semantically correct but as our data model is based on VCS ones I am not sure if we can do that. For instance, this could be useful to easily generate diffs between two successive package version in the web application. anlambert: @ardumont, currently we do not populate the `parents` field in the PyPI and npm loader. We… | |||||
Not Done Inline Actions[this is an aside from the code review, just reacting to @anlambert's remark on pointing at previous package versions] For a given package version, determining the logical "previous package version" is not that easy, for instance when you consider long term support branches. A practical example follows. Let's say that on the first visit, you find versions 1.0, 1.1, 1.2, 2.0 and 2.1. Your heuristic may generate the following edges:
On the second visit, you discover versions 1.0, 1.1, 1.2, 1.3, 2.0, 2.1, 2.2 and 3.0. What edges should the heuristic generate then?
We could refine the heuristic by parsing the changelog, if it exists, as that is a more explicit indication of parenthood. That's what the Debian loader did, originally: it's fairly easy to do there because there's a mandatory changelog file in a machine-parseable format (well, at least for "recent" packages), and tools in the ecosystem, for instance the bug tracking system, depend on the maintainers properly tracking the version lineage there. The issue with tracking package version parenthood/history within the revisions themselves then becomes: what happens if a previous version is missing in the Software Heritage archive? This really makes the PID of a given package dependent on the previous contents of the archive, rather than being somewhat self-contained and only depending on the package data itself. So far we've erred on the side of keeping stuff as self-contained as possible. If we find a decent heuristic to parse changelogs, we can build upon that to provide the information after the fact. olasd: [this is an aside from the code review, just reacting to @anlambert's remark on pointing at… | |||||
self.maybe_load_contents(self.package_contents) | |||||
self.maybe_load_directories(self.package_directories) | |||||
self.maybe_load_revisions(self.package_revisions) | |||||
if self.done: | |||||
self.generate_and_load_snapshot() | |||||
self.flush() | |||||
def generate_and_load_snapshot(self): | |||||
"""Generate and load snapshot for the package visit. | |||||
""" | |||||
branches = {} | |||||
for version_data in self.all_version_data: | |||||
branch_name = self.find_branch_name(version_data['archive']) | |||||
target = self.target_from_version(version_data['id']) | |||||
branches[branch_name] = target | |||||
branches = self.find_head(branches, branch_name, | |||||
version_data['date']) | |||||
Done Inline ActionsI do not think that indirection is necessary at the moment. Simply explaining in the docstring that the generate_and_load_snapshot will gather package versions as branches and generate a snapshot from them is sufficient to understand what is is intended to do. anlambert: I do not think that indirection is necessary at the moment. Simply explaining in the docstring… | |||||
if not target: | |||||
self._visit_status = 'partial' | |||||
snapshot = { | |||||
'branches': branches, | |||||
} | |||||
snapshot['id'] = identifier_to_bytes(snapshot_identifier(snapshot)) | |||||
self.maybe_load_snapshot(snapshot) | |||||
def find_branch_name(self, url): | |||||
Not Done Inline Actionsanlambert: In the PyPI and npm loader, branches are named the following: `releases/<package_version>` (see… | |||||
Not Done Inline Actionss/coherency/consistency/ anlambert: s/coherency/consistency/ | |||||
Not Done Inline ActionsWith gnu, it is not possible to get the package_version, so here I used the basename of url (as shown in docstring). Hope that is fine ;) nahimilega: With gnu, it is not possible to get the package_version, so here I used the basename of url (as… | |||||
"""Extract branch name from tarball url | |||||
Args: | |||||
url (str): Tarball URL | |||||
Returns: | |||||
byte: Branch name | |||||
Example: | |||||
For url = https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz | |||||
>>> find_branch_name(url) | |||||
b'release/8sync-0.2.0' | |||||
""" | |||||
branch_name = '' | |||||
filename = os.path.basename(url) | |||||
filename_parts = filename.split(".") | |||||
if len(filename_parts) > 1 and filename_parts[-2] == 'tar': | |||||
for part in filename_parts[:-2]: | |||||
branch_name += '.' + part | |||||
elif len(filename_parts) > 1 and filename_parts[-1] == 'zip': | |||||
for part in filename_parts[:-1]: | |||||
branch_name += '.' + part | |||||
return (('release/%s') % branch_name[1:]).encode('ascii') | |||||
def find_head(self, branches, branch_name, timestamp): | |||||
"""Make branch head. | |||||
Checks if the current version is the latest version. Make it as head | |||||
if it is the latest version. | |||||
Args: | |||||
branches (dict): Branches for the focused package. | |||||
branch_name (str): Branch name | |||||
Returns: | |||||
dict: Branches for the focused package | |||||
""" | |||||
if self.latest_timestamp == int(timestamp): | |||||
branches[b'HEAD'] = { | |||||
'target_type': 'alias', | |||||
'target': branch_name, | |||||
} | |||||
return branches | |||||
def target_from_version(self, revision_id): | |||||
return { | |||||
'target': revision_id, | |||||
'target_type': 'revision', | |||||
} if revision_id else None | |||||
def load_status(self): | |||||
return { | |||||
'status': self._load_status, | |||||
} | |||||
def visit_status(self): | |||||
return self._visit_status | |||||
def cleanup(self): | |||||
"""Clean up temporary disk use after downloading and extracting | |||||
package tarballs. | |||||
""" | |||||
if self.debug: | |||||
self.log.warn('%s Will not clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
if os.path.exists(self.temp_directory): | |||||
self.log.debug('Clean up %s' % self.temp_directory) | |||||
shutil.rmtree(self.temp_directory) |
This is a problem it should be temp_directory = self.config['temp_directory']
But while running it gives key error. Key temp_directory not found.
Hence I temporarily changed it to this.
How can I fix this, is there something I need to do in the loader?