Changeset View
Standalone View
swh/loader/base/loader.py
- This file was added.
# Copyright (C) 2019 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import os | |||||
import shutil | |||||
from abc import abstractmethod | |||||
from tempfile import mkdtemp | |||||
from swh.core import tarball | |||||
from .abstractattribute import AbstractAttribute | |||||
from swh.loader.core.utils import clean_dangling_folders | |||||
from swh.loader.core.loader import BufferedLoader | |||||
from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE | |||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | |||||
from swh.model.from_disk import Directory | |||||
from .build import construct_revision | |||||
from swh.model.identifiers import ( | |||||
identifier_to_bytes, revision_identifier | |||||
) | |||||
DEBUG_MODE = '** DEBUG MODE **' | |||||
anlambert: How about renaming that class to `PackageLoader` ? This feels more explicit about what it is… | |||||
Done Inline Actions
I like it better indeed!
I asked to move those back in core.
keep it there at the moment. That's what we (olasd, nahimilega and me) agreed on from the start. ardumont: > How about renaming that class to PackageLoader ? This feels more explicit about what it is… | |||||
class BaseLoader(BufferedLoader, construct_revision): | |||||
""" | |||||
Required Overrides: | |||||
loader_name | |||||
class_name | |||||
def convert_to_standard_format | |||||
Optional Overrides: | |||||
def cleanup_artifact | |||||
def extract_metadata | |||||
Done Inline ActionsImprove the description here. For inspiration, you can have a look at the swh.model.merkle.MerkleNode class. ardumont: Improve the description here.
For inspiration, you can have a look at the `swh.model.merkle. | |||||
""" | |||||
loader_name = AbstractAttribute("Name of the package manager") # e.g pypi | |||||
class_name = AbstractAttribute("Name of the loader class") # eg PyPILoader | |||||
def __init__(self): | |||||
super().__init__(logging_class='swh.loader.%s.%s' % (self.loader_name, | |||||
self.class_name)) | |||||
self.TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.%s.' % self.loader_name | |||||
self.CONFIG_BASE_FILENAME = 'loader/%s' % self.loader_name | |||||
self.ADDITIONAL_CONFIG = { | |||||
'temp_directory': ('str', | |||||
'/tmp/swh.loader.%s/' % self.loader_name), | |||||
'cache': ('bool', False), | |||||
'cache_dir': ('str', ''), | |||||
'debug': ('bool', False), # NOT FOR PRODUCTION | |||||
} | |||||
self.local_cache = None | |||||
Done Inline ActionsThose should be docstrings, not values of the variables. loader_name = None """Package manager name""" class_name = None """Loader class name""" etc... ardumont: Those should be docstrings, not values of the variables.
```
loader_name = None
"""Package… | |||||
self.dir_path = None | |||||
temp_directory = self.config['temp_directory'] | |||||
os.makedirs(temp_directory, exist_ok=True) | |||||
Done Inline ActionsIf those are for the configuration file to load, we are moving away from this (even though, that's not apparent in the code). We use the environment variable stanza (i explained to you in irc) to deploy those, be that in the docker-dev or the production environment. ardumont: If those are for the configuration file to load, we are moving away from this (even though… | |||||
Not Done Inline ActionsShall I also mention in the docstring or someplace tp environment variable stanza. Like. export SWH_CONFIG_FILENAME=/path/to/loader-gnu.yml nahimilega: Shall I also mention in the docstring or someplace tp environment variable stanza. Like.
```… | |||||
Not Done Inline ActionsI don't really know where to define that part without duplicating... (it's already in the docker-dev for other loaders and in prod since we need it to work there ;) ardumont: I don't really know where to define that part without duplicating... (it's already in the… | |||||
self.temp_directory = mkdtemp( | |||||
suffix='-%s' % os.getpid(), | |||||
prefix=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=temp_directory) | |||||
self.debug = self.config.get('debug', False) | |||||
Done Inline ActionsAlso, i'd need to check but the additional_config is also something to deprecate (so to remove here)... Rationale: if it's not maintained anymore, it's dead code, so that must go away. ardumont: Also, i'd need to check but the additional_config is also something to deprecate (so to remove… | |||||
@abstractmethod | |||||
def convert_to_standard_format(self, **kwargs): | |||||
"""Fetch the metadata and convert it into a standard format | |||||
The standard format is a dict with keys | |||||
`name` (str): Holding name of the package | |||||
`origin_url` (str): Holding the origin_url of the package | |||||
`tarballs` (list): A list of dicts where each dict contains | |||||
information related to a single version of the | |||||
package. The `url` key in the dict is necessary and will | |||||
hold tarball url. Other keys are optional and as per | |||||
availability of metadata. | |||||
Note: Keys `nature` and `response` are reserved keywords and cannot be | |||||
used in the dict that are present under key `tarballs` | |||||
Args: | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
Returns: | |||||
dict: Containing information as directed by the guidelines | |||||
mentioned above | |||||
Example: | |||||
{ | |||||
name:'8sync', | |||||
origin_url:'https://ftp.gnu.org/gnu/8sync/', | |||||
tarballs:[{url: 'https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz', | |||||
time_modified: 1562878592 }, | |||||
{url: 'https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz', | |||||
time_modified: 1599887203 }, | |||||
... | |||||
] | |||||
} | |||||
""" | |||||
pass | |||||
def cleanup_artifact(self, uncompressed_path): | |||||
"""Clean up unnecessary files from the downloaded tarball | |||||
also some special operation if needed. | |||||
Implementation of this method depends on the file structure of the | |||||
tarball. It is used to clean up files from the uncompressed tarball | |||||
that are not to be archived(eg binaries). | |||||
Args: | |||||
Not Done Inline ActionsFrom my point of view, this method is useless and could be removed if we slightly update the package listers implementation (GNU, CRAN, npm, PyPI, ...). Those listers should provide the following parameters to the loader: name, origin_url and tarballs. anlambert: From my point of view, this method is useless and could be removed if we slightly update the… | |||||
Not Done Inline ActionsI agree again. Note that it means a migration from the existing recurring tasks in the scheduler db (out of scope from this task ;) ardumont: I agree again.
If we converge with a use case, we can change the lister implementation to unify… | |||||
Not Done Inline ActionsI am again not sure we should do this because, as once olasd told that we parallelize the loading task. Hence we should not keep the API call(to get metadata) step in the listers. nahimilega: I am again not sure we should do this because, as once olasd told that we parallelize the… | |||||
Not Done Inline ActionsSomething rings a bell indeed. Your sentences are a bit unclear.
I'm not sure i see the causality between the parallelize the loading-task and moving the api call to solve metadata loader side.
Same here. That a lister takes some time to list is not per say a problem. In any case, if this method is the one that solves the metadata loading, it's not clear from its name. ardumont: Something rings a bell indeed.
Moving the metadata retrieval done lister side to the loader… | |||||
Not Done Inline Actions
My point was, when I was making packagist lister, in the first pass, I kept the metadata retrieval step (it would send an api request for each package to get metadata) in the lister itself. While testing it, on my pc, it took like 3 hours to list only about 5 % of the total packages. Now if this task is done in loading. The lister will pass the metadata URL of the package to the loader. And we will have a separate loading task for each package. If we can parallelise the task(i.e. run many loading tasks at the same time), then we could make metadata request for many packages at the same time, and it will make the whole process fast. One more point it will make the whole system more robust. As let us suppose that there are exceptions in the metadata API for one or two package, like some field is not present or was present at the time lister was developed now removed. Hope it is a bit clear what I am trying to say ;)
I will try to make it more explicit, but actually, the task of that method is to make the metadata request(if needed) and convert the output coming from lister to a standard format which is used in the loader. The standard format is the format as shown in docstring. nahimilega: >I'm not sure I see the causality between the parallelise the loading-task and moving the API… | |||||
Not Done Inline Actions@nahimilega , I totally agree with you that the metadata extraction for a package must be handled loader-side. This calls indeed for a dedicated method to perform that task in the PackageLoader class that derived ones must reimplement. But keep in mind that metadata extraction must be performed for each version of a package and that most of the times there is no need to perform an API call to a remote service as package content usually provide a file filled with these information that can be easily parsed (package.json, PKG-INFO, ...). So trying to extract metadata before iterating on each version of a package to load into the archive is not the right solution. I was also confused by the naming of the method, convert_to_standard_format is not reallly explicit about its role. As @ardumont explained, every processiong step for loading the different versions of a package into the archive should be clearly identified with enough granularity to cover all package managers available in the wild. This implies a lot of brainstorming to do it well and define an API that will be easily usable and maintainable ;-) Let's continue that work next week ! anlambert: @nahimilega , I totally agree with you that the metadata extraction for a package must be… | |||||
Not Done Inline Actions
But for the package managers where we don't even know the versions(eg pypi) and we need to make a API call to get a list of package version, in that case, convert_to_standard_format can be used. And for extraction of metadata from the file provided(package.json, PKG-INFO, ...) extract_metadata method can be used. It iterated on each version of a package. nahimilega: >But keep in mind that metadata extraction must be performed for each version of a package.
But… | |||||
uncompressed_path (str): Path of uncompressed tarball | |||||
Returns: | |||||
uncompressed_path (str): Path of uncompressed tarball after | |||||
removing unnecessary files | |||||
""" | |||||
return uncompressed_path | |||||
def extract_metadata(self, package_path, package_source_data): | |||||
"""Fetch the metadata from the downloaded file. | |||||
""" | |||||
return package_source_data | |||||
# You probably don't need to override anything below this line. | |||||
def prepare_origin_visit(self, **kwargs): | |||||
"""Prepare package visit. | |||||
Args: | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | |||||
# reset statuses | |||||
self._load_status = 'uneventful' | |||||
Done Inline ActionsThe correct signature of this method is: def prepare_origin_visit(self, *args, **kwargs): The loader will fail to execute when executed by a celery worker otherwise (this is what we used in production to schedule and execute the loading of software origins in the archive).. anlambert: The correct signature of this method is:
```lang=python
def prepare_origin_visit(self, *args… | |||||
self._visit_status = 'full' | |||||
self.done = False | |||||
# fetch the npm package metadata from the registry | |||||
self.package_details = self.convert_to_standard_format(**kwargs) | |||||
self.origin = { | |||||
'url': self.package_details['origin_url'], | |||||
'type': self.loader_name, | |||||
} | |||||
self.visit_date = None # loader core will populate it | |||||
def prepare(self, **kwargs): | |||||
"""Prepare effective loading of source tarballs for a package manager | |||||
Not Done Inline ActionsIf we remove the convert_to_standard_format method and ensure consistent inputs provided by the listers to that loader, this could be changed in: self.package_details = kwargs anlambert: If we remove the `convert_to_standard_format` method and ensure consistent inputs provided by… | |||||
Not Done Inline ActionsI don't think that's a good idea. convert_to_standard_format is provided for loaders where we need to make an API call to get the metadata(eg pypi). nahimilega: I don't think that's a good idea. `convert_to_standard_format` is provided for loaders where we… | |||||
package. | |||||
Args: | |||||
**kwargs: Arbitrary keyword arguments passed by the lister. | |||||
""" | |||||
self.contents = [] | |||||
self.directories = [] | |||||
self.revisions = [] | |||||
self.package_temp_dir = os.path.join(self.temp_directory, | |||||
self.package_details['name']) | |||||
last_snapshot = self.last_snapshot() | |||||
Done Inline ActionsSame here, correct signature is: def prepare(self, *args, **kwargs): anlambert: Same here, correct signature is:
```lang=python
def prepare(self, *args, **kwargs):
``` | |||||
self.known_versions = self.known_versions(last_snapshot) | |||||
self.new_artifacts = \ | |||||
self.prepare_package_versions(self.package_details['tarballs'], | |||||
self.known_versions) | |||||
def _prepare_package_version(self, package_source_data, tarball_request): | |||||
"""Process the package release version. | |||||
The following operations are performed: | |||||
1. Download the tarball | |||||
2. Uncompress the tarball | |||||
3. Delete unnecessary files (optional) | |||||
4. Parse the file associated to the package version to extract | |||||
metadata (optional) | |||||
Done Inline ActionsChange the function name from known_versions() to known_version() because there was a class variable self.known_versions. nahimilega: Change the function name from `known_versions()` to `known_version()` because there was a class… | |||||
Done Inline ActionsI would rather rename it to get_known_versions, the plural use is important here anlambert: I would rather rename it to `get_known_versions`, the plural use is important here | |||||
Not Done Inline Actionsi agree! ardumont: i agree! | |||||
Args: | |||||
Not Done Inline Actionsnew_versions seems a better name here anlambert: `new_versions` seems a better name here | |||||
package_source_data (list): a list of dicts containing information | |||||
about the respective tarball that is provided by lister. | |||||
known_versions (dict): may be provided by the loader, it enables | |||||
to filter out versions already ingested in the archive. | |||||
Return: | |||||
Tuple[dict, str]: tuples containing the following | |||||
members: | |||||
* a dict holding package tarball information and metadata | |||||
* a string holding the path of the uncompressed package to | |||||
load into the archive | |||||
""" | |||||
url = package_source_data['url'] | |||||
tarball_path, hashes = self.download_generate_hash(tarball_request, | |||||
url) | |||||
uncompressed_path = os.path.join(self.package_temp_dir, 'uncompressed', | |||||
url) # SEE ME | |||||
package_source_data['nature'] = self.uncompress_tarball( | |||||
tarball_path, uncompressed_path) | |||||
# remove tarball | |||||
os.remove(tarball_path) | |||||
if self.tarball_invalid: | |||||
return None, None | |||||
package_path = self.cleanup_artifact(uncompressed_path) | |||||
package_source_data = self.extract_metadata(package_path, | |||||
package_source_data) | |||||
return package_source_data, package_path | |||||
def fetch_data(self): | |||||
"""Called once per release artifact version (can be many for one | |||||
release). | |||||
This will for each call: | |||||
- retrieve a release artifact (associated to a release version) | |||||
- Computes the swh objects | |||||
Returns: | |||||
True as long as data to fetch exist | |||||
""" | |||||
data = None | |||||
if self.done: | |||||
return False | |||||
try: | |||||
data = next(self.new_artifacts) | |||||
self._load_status = 'eventful' | |||||
except StopIteration: | |||||
self.done = True | |||||
return False | |||||
package_source_data, dir_path = data | |||||
# package release tarball was corrupted | |||||
if self.tarball_invalid: | |||||
return not self.done | |||||
dir_path = dir_path.encode('utf-8') | |||||
directory = Directory.from_disk(path=dir_path, data=True) | |||||
objects = directory.collect() | |||||
if 'content' not in objects: | |||||
objects['content'] = {} | |||||
if 'directory' not in objects: | |||||
objects['directory'] = {} | |||||
self.contents = objects['content'].values() | |||||
self.directories = objects['directory'].values() | |||||
revision = self.compute_revision(directory, | |||||
package_source_data) # FIX ME | |||||
revision['id'] = identifier_to_bytes( | |||||
revision_identifier(revision)) | |||||
self.revisions.append(revision) | |||||
# Change me to be compatable with if modified since | |||||
package_key = package_source_data[self.compare_field] # check for this | |||||
self.known_versions[package_key] = revision['id'] # SEE ME | |||||
self.log.debug('Removing unpacked package files at %s', dir_path) | |||||
shutil.rmtree(dir_path) | |||||
return not self.done | |||||
def last_snapshot(self): | |||||
"""Retrieve the last snapshot of the package if any. | |||||
""" | |||||
# Done | |||||
Done Inline Actionspreviously function definition was def check_file(self, length, filepath): but it was called as self.check_file(filepath, length) which lead to assigning of filepath value to length variable. Fixed this bug nahimilega: previously function definition was `def check_file(self, length, filepath):` but it was called… | |||||
visit = self.storage.origin_visit_get_latest( | |||||
self.origin['url'], require_snapshot=True) | |||||
if visit: | |||||
return snapshot_get_all_branches(self.storage, visit['snapshot']) | |||||
Done Inline ActionsCan you please check once for this method that docstring appropriate and correctly explains what is done in the function. nahimilega: Can you please check once for this method that docstring appropriate and correctly explains… | |||||
Done Inline ActionsIt's the method we use everywhere to compute hashes on a file. This method does 2 things, given 1 response which holds a raw file to some request:
Implementation wise, this does this in one round-trip. It then returns the hashes (dict) of the file. IIRC, the writing part was done in a cache (on disk). ardumont: It's the method we use everywhere to compute hashes on a file.
This method does 2 things… | |||||
def store_data(self): | |||||
"""Store fetched data in the database. | |||||
""" | |||||
# Done | |||||
self.maybe_load_contents(self.contents) | |||||
self.maybe_load_directories(self.directories) | |||||
self.maybe_load_revisions(self.revisions) | |||||
if self.done: | |||||
self.generate_and_load_snapshot() | |||||
self.flush() | |||||
def generate_and_load_snapshot(self): | |||||
""" | |||||
Make me | |||||
""" | |||||
pass | |||||
def download_generate_hash(self, response, url): | |||||
"""Store file in temp directory and computes hash of its filepath | |||||
Args: | |||||
response (Response): Server response of the url | |||||
url (str): Url of the tarball | |||||
Returns: | |||||
Tuple of local (filepath, hashes of filepath) | |||||
""" | |||||
# Done | |||||
length = int(response.headers['content-length']) | |||||
# SEE ME | |||||
filepath = os.path.join(self.package_temp_dir, os.path.basename(url)) | |||||
h = MultiHash(length=length) | |||||
with open(filepath, 'wb') as f: | |||||
for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): | |||||
h.update(chunk) | |||||
f.write(chunk) | |||||
actual_length = os.path.getsize(filepath) | |||||
if length != actual_length: | |||||
raise ValueError('Error when checking size: %s != %s' % ( | |||||
length, actual_length)) | |||||
hashes = { | |||||
'length': length, | |||||
**h.hexdigest() | |||||
} | |||||
return filepath, hashes | |||||
def uncompress_tarball(self, filepath, path): | |||||
"""Uncompress a tarball. | |||||
Args: | |||||
filepath (str): Path of tarball to uncompress | |||||
path (str): The destination folder where to uncompress the tarball | |||||
Returns: | |||||
The nature of the tarball, zip or tar. | |||||
""" | |||||
# Done | |||||
# filepath = tempdir + url | |||||
try: | |||||
self.tarball_invalid = False | |||||
return tarball.uncompress(filepath, path) | |||||
except Exception: | |||||
self.tarball_invalid = True | |||||
return None | |||||
def pre_cleanup(self): | |||||
"""To prevent disk explosion if some other workers exploded | |||||
in mid-air (OOM killed), we try and clean up dangling files. | |||||
""" | |||||
# Done | |||||
if self.debug: | |||||
self.log.warn('%s Will not pre-clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
clean_dangling_folders(self.config['temp_directory'], | |||||
pattern_check=self.TEMPORARY_DIR_PREFIX_PATTERN, | |||||
log=self.log) | |||||
def flush(self): | |||||
"""Flush any potential dangling data not sent to swh-storage. | |||||
Bypass the maybe_load_* methods which awaits threshold reached | |||||
signal. We actually want to store those as we are done | |||||
loading. | |||||
""" | |||||
# Done | |||||
contents = self.contents.pop() | |||||
directories = self.directories.pop() | |||||
revisions = self.revisions.pop() | |||||
releases = self.releases.pop() | |||||
# and send those to storage if asked | |||||
if self.config['send_contents']: | |||||
self.send_batch_contents(contents) | |||||
if self.config['send_contents']: | |||||
self.send_batch_directories(directories) | |||||
if self.config['send_revisions']: | |||||
self.send_batch_revisions(revisions) | |||||
if self.config['send_releases']: | |||||
self.send_batch_releases(releases) | |||||
if self.config['send_snapshot'] and self.snapshot: | |||||
self.send_snapshot(self.snapshot) | |||||
def cleanup(self): | |||||
"""Clean up temporary disk use after downloading and extracting | |||||
package tarballs. | |||||
""" | |||||
# Done | |||||
if self.debug: | |||||
self.log.warn('%s Will not clean up temp dir %s' % ( | |||||
DEBUG_MODE, self.temp_directory | |||||
)) | |||||
return | |||||
if os.path.exists(self.temp_directory): | |||||
self.log.debug('Clean up %s' % self.temp_directory) | |||||
shutil.rmtree(self.temp_directory) | |||||
Done Inline ActionsFeels like those cleanup and pre-cleanup are already implemented elsewhere (in the loader-core?)... ardumont: Feels like those `cleanup` and `pre-cleanup` are already implemented elsewhere (in the loader… | |||||
Done Inline Actionsnahimilega: https://forge.softwareheritage.org/source/swh-loader-core/browse/master/swh/loader/core/loader. | |||||
Done Inline Actionsright, that must be in the other loaders then (mercurial, svn, pypi, npm) ardumont: right, that must be in the other loaders then (mercurial, svn, pypi, npm) | |||||
Not Done Inline ActionsI don't understand that method. ardumont: I don't understand that method.
| |||||
Not Done Inline ActionsThere is a HEAD for every branch that is created by the loader. Now, this is possible when the loader with NPM to determine the latest release because the version of the package is provided in the metadata. But there could be some package manager(in fact many) which do not provide the version metadata or some metadata which could be used to determine the latest release to determine the HEAD. So I am not sure how to make properly identify the head of the branch hence for testing purpose I have done if True: One possible way out could be we just leave this method to be made in all the loaders separately(as done in convert_to_standard_format(). Is there any better approach to solve this problem? And is it necessary to have HEAD for the branch? nahimilega: There is a `HEAD` for every branch that is created by the loader.
In NPM loader this head… | |||||
Done Inline ActionsImprove docstring nahimilega: Improve docstring | |||||
Not Done Inline ActionsSame problem of unavailability of metadata as mentioned in the inline comment find_head(). Is there any better approach to solve this problem? nahimilega: Same problem of unavailability of metadata as mentioned in the inline comment `find_head()`. |
How about renaming that class to PackageLoader ? This feels more explicit about what it is intended to do.
Also, I would rename the folder containing these new source files from base to package.
Maybe this could also be moved in a new dedicated swh-loader-package repository, but let's
keep it there at the moment.