diff --git a/swh/loader/base/__init__.py b/swh/loader/base/__init__.py new file mode 100644 diff --git a/swh/loader/base/abstractattribute.py b/swh/loader/base/abstractattribute.py new file mode 100644 --- /dev/null +++ b/swh/loader/base/abstractattribute.py @@ -0,0 +1,26 @@ +# Copyright (C) 2019 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class AbstractAttribute: + """AbstractAttributes in a base class must be overridden by the subclass. + + It's like the :func:`abc.abstractmethod` decorator, but for things that + are explicitly attributes/properties, not methods, without the need for + empty method def boilerplate. Like abc.abstractmethod, the class containing + AbstractAttributes must inherit from :class:`abc.ABC` or use the + :class:`abc.ABCMeta` metaclass. + + Usage example:: + + import abc + class ClassContainingAnAbstractAttribute(abc.ABC): + foo = AbstractAttribute('descriptive docstring for foo') + + """ + __isabstractmethod__ = True + + def __init__(self, docstring=None): + if docstring is not None: + self.__doc__ = 'AbstractAttribute: ' + docstring diff --git a/swh/loader/base/build.py b/swh/loader/base/build.py new file mode 100644 --- /dev/null +++ b/swh/loader/base/build.py @@ -0,0 +1,110 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import copy +import logging +import os + +import arrow + + +logger = logging.getLogger(__name__) + +# I AM WORK IN PROGRESS DON'T SEE ME + +# Static setup +EPOCH = 0 +UTC_OFFSET = 0 +SWH_PERSON = { + 'name': 'Software Heritage', + 'fullname': 'Software Heritage', + 'email': 'robot@softwareheritage.org' +} +REVISION_MESSAGE = 'swh-loader-tar: synthetic revision message' +REVISION_TYPE = 'tar' + + +def _time_from_last_modified(last_modified): + """Compute the modification time from the tarpath. + + Args: + last_modified (str): Last modification time + + Returns: + dict representing a timestamp with keys {seconds, microseconds} + + """ + last_modified = arrow.get(last_modified) + mtime = last_modified.float_timestamp + normalized_time = list(map(int, str(mtime).split('.'))) + return { + 'seconds': normalized_time[0], + 'microseconds': normalized_time[1] + } + + +def compute_revision(tarpath, last_modified): + """Compute a revision. + + Args: + tarpath (str): absolute path to the tarball + last_modified (str): Time of last modification read from the + source remote (most probably by the lister) + + Returns: + Revision as dict: + - date (dict): the modification timestamp as returned by + _time_from_path function + - committer_date: the modification timestamp as returned by + _time_from_path function + - author: cf. SWH_PERSON + - committer: cf. SWH_PERSON + - type: cf. REVISION_TYPE + - message: cf. REVISION_MESSAGE + + """ + ts = _time_from_last_modified(last_modified) + + return { + 'date': { + 'timestamp': ts, + 'offset': UTC_OFFSET, + }, + 'committer_date': { + 'timestamp': ts, + 'offset': UTC_OFFSET, + }, + 'author': SWH_PERSON, + 'committer': SWH_PERSON, + 'type': REVISION_TYPE, + 'message': REVISION_MESSAGE, + 'synthetic': True, + } + + +def set_original_artifact(*, revision, filepath, nature, hashes): + """Set the original artifact data on the given revision for + the tarball currently being loaded.""" + + revision = copy.deepcopy(revision) + if 'metadata' not in revision or not revision['metadata']: + revision['metadata'] = {} + if 'original_artifact' in revision['metadata']: + oa = revision['metadata']['original_artifact'] + if oa: + logger.warning( + 'Revision already contains original_artifact metadata, ' + 'replacing: %r', + oa, + ) + + revision['metadata']['original_artifact'] = [{ + 'name': os.path.basename(filepath), + 'archive_type': nature, + **hashes, + }] + + return revision diff --git a/swh/loader/base/dowload.py b/swh/loader/base/dowload.py new file mode 100644 --- /dev/null +++ b/swh/loader/base/dowload.py @@ -0,0 +1,308 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import requests +from .abstractattribute import AbstractAttribute + +try: + from _version import __version__ +except ImportError: + __version__ = 'devel' + + +# This file contains methods to check and remove archived package version +# and download the new package version. + + +class If_Modified_Since: + """Uses if_modified_then header to check for archived packages + + + """ + def __init__(self): + self.session = requests.session() + self.params = { + 'headers': { + 'User-Agent': 'Software Heritage Tar Loader (%s)' % ( + __version__ + ) + } + } + + def known_artifacts(self, last_snapshot): + """ + Retrieve the known release versions for the npm package + (i.e. those already ingested into the archive). + + Args + last_snapshot (dict): Last snapshot for the visit + + Returns: + dict: Dict whose keys are Tuple[filename, sha1] and values + are revision ids. + + """ + if not last_snapshot or 'branches' not in last_snapshot: + return {} + + # retrieve only revisions (e.g the alias we do not want here) + revs = [rev['target'] + for rev in last_snapshot['branches'].values() + if rev and rev['target_type'] == 'revision'] + known_revisions = self.storage.revision_get(revs) + ret = {} + for revision in known_revisions: + if not revision: # revision_get can return None + continue + if 'original_artifact' in revision['metadata']: # Fix me + artifact = revision['metadata']['original_artifact'] + ret[artifact['url']] = [revision['id'], + artifact['time_last_visit']] + return ret + + def filter_package_versions(self, tarballs, known_versions): + """ + Return the available versions for the focused package. + + Args: + tarballs (list): may be provided by the loader, it enables + to filter out versions already ingested in the archive. + + Returns: + dict: A dict whose keys are Tuple[version, tarball_sha1] and + values dicts with the following entries: + + * **name**: the package name + * **version**: the package version + * **filename**: the package source tarball filename + * **sha1**: the package source tarball sha1 checksum + * **date**: the package release date + * **url**: the package source tarball download url + """ + # Done version is artifact + versions = [] + + for release in tarballs: + if release['url'] in known_versions: + + tarball_url = release['url'] + tarball_request = self._request( + tarball_url, + time_last_visit=known_versions[tarball_url][1], + throw_error=False) + + if tarball_request.status_code == 304: + continue + + elif tarball_request.status_code == 404: + self.log.debug('Tarball url %s returns a 404 error.', + tarball_url) + continue + release['response'] = tarball_request + versions.append(release) + + return versions + + def _request(self, url, time_last_visit, throw_error=True): + """Request the remote tarball url. + + Args: + url (str): Url (file or http*) + + Raises: + ValueError in case of failing to query + + Returns: + server response + + """ + self.params['headers']['If-Modified-Since'] = time_last_visit + response = self.session.get(url, **self.params, stream=True) + if response.status_code != 200: + raise ValueError("Fail to query '%s'. Reason: %s" % ( + url, response.status_code)) + return response + + def prepare_package_versions(self, known_versions=None): + """ + Instantiate a generator that will process a specific package released + version at each iteration step. The following operations will be + performed: + + 1. Create a temporary directory to download and extract the + release tarball + 2. Download the tarball + 3. Check downloaded tarball integrity + 4. Uncompress the tarball + 5. Parse ``package.json`` file associated to the package version + 6. Extract author from the parsed ``package.json`` file + + Args: + known_versions (dict): may be provided by the loader, it enables + to filter out versions already ingested in the archive. + + Yields: + Tuple[dict, dict, dict, str]: tuples containing the following + members: + + * a dict holding the parsed ``package.json`` file + * a dict holding package author information + * a dict holding package tarball information + * a string holding the path of the uncompressed package to + load into the archive + + """ + new_versions = self.filter_package_versions(known_versions) + for package_source_data in new_versions: + tarball_request = package_source_data['response'] + + # To make things simple while creating revisions + del package_source_data['response'] + yield self._prepare_package_version(package_source_data, + tarball_request) + + +class compare_field: + """Uses a field present in the metadata to check for archived packages. + + """ + + compare_field = AbstractAttribute("Field used to identify if the package" + "version is previously archived") + # eg for pypi loader compare_field = 'sha' + + def __init__(self): + self.session = requests.session() + self.params = { + 'headers': { + 'User-Agent': 'Software Heritage Tar Loader (%s)' % ( + __version__ + ) + } + } + + def _request(self, url, throw_error=True): + """Request the remote tarball url. + + Args: + url (str): Url (file or http*) + + Raises: + ValueError in case of failing to query + + Returns: + Tuple of local (filepath, hashes of filepath) + + """ + response = self.session.get(url, **self.params, stream=True) + if response.status_code != 200 and throw_error: + raise ValueError("Fail to query '%s'. Reason: %s" % ( + url, response.status_code)) + return response + + def known_artifacts(self, last_snapshot): + """ + Retrieve the known release versions for the npm package + (i.e. those already ingested into the archive). + + Args + last_snapshot (dict): Last snapshot for the visit + + Returns: + dict: Dict whose keys are Tuple[filename, sha1] and values + are revision ids. + + """ + if not last_snapshot or 'branches' not in last_snapshot: + return {} + + # retrieve only revisions (e.g the alias we do not want here) + revs = [rev['target'] + for rev in last_snapshot['branches'].values() + if rev and rev['target_type'] == 'revision'] + known_revisions = self.storage.revision_get(revs) + ret = {} + for revision in known_revisions: + if not revision: # revision_get can return None + continue + if 'original_artifact' in revision['metadata']: # Fix me + artifact = revision['metadata']['original_artifact'] + ret[artifact[self.compare_field]] = revision['id'] + return ret + + def filter_package_versions(self, tarballs, known_versions): + """ + Return the available versions for the focused package. + + Args: + tarballs (list): may be provided by the loader, it enables + to filter out versions already ingested in the archive. + + Returns: + dict: A dict whose keys are Tuple[version, tarball_sha1] and + values dicts with the following entries: + + * **name**: the package name + * **version**: the package version + * **filename**: the package source tarball filename + * **sha1**: the package source tarball sha1 checksum + * **date**: the package release date + * **url**: the package source tarball download url + """ + # Done version is artifact + versions = [] + + for release in tarballs: + if release[self.compare_field] in known_versions: + continue + versions.append(release) + + return versions + + def prepare_package_versions(self, tarballs, known_versions=None): + """ + Instantiate a generator that will process a specific package released + version at each iteration step. The following operations will be + performed: + + 1. Create a temporary directory to download and extract the + release tarball + 2. Download the tarball + 3. Check downloaded tarball integrity + 4. Uncompress the tarball + 5. Parse ``package.json`` file associated to the package version + 6. Extract author from the parsed ``package.json`` file + + Args: + known_versions (dict): may be provided by the loader, it enables + to filter out versions already ingested in the archive. + + Yields: + Tuple[dict, dict, dict, str]: tuples containing the following + members: + + * a dict holding the parsed ``package.json`` file + * a dict holding package author information + * a dict holding package tarball information + * a string holding the path of the uncompressed package to + load into the archive + + """ + new_versions = self.filter_package_versions(tarballs, known_versions) + for package_source_data in new_versions: + # filter out version with missing tarball, + # package visit will be marked as partial at the end of + # the loading process + + tarball_url = package_source_data['url'] + tarball_request = self._request(tarball_url, + throw_error=False) + if tarball_request.status_code == 404: + self.log.debug('Tarball url %s returns a 404 error.', + tarball_url) + continue + + yield self._prepare_package_version(package_source_data, + tarball_request) diff --git a/swh/loader/base/loader.py b/swh/loader/base/loader.py new file mode 100644 --- /dev/null +++ b/swh/loader/base/loader.py @@ -0,0 +1,415 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import shutil + +from abc import abstractmethod +from tempfile import mkdtemp + +from swh.core import tarball +from .abstractattribute import AbstractAttribute +from swh.loader.core.utils import clean_dangling_folders +from swh.loader.core.loader import BufferedLoader +from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE +from swh.storage.algos.snapshot import snapshot_get_all_branches +from swh.model.from_disk import Directory + + +from .build import compute_revision, set_original_artifact + +from swh.model.identifiers import ( + identifier_to_bytes, revision_identifier +) + +DEBUG_MODE = '** DEBUG MODE **' + + +class BaseLoader(BufferedLoader): + """ + + Required Overrides: + loader_name + class_name + def convert_to_standard_forma + + Optional Overrides: + def cleanup_artifact + def extract_metadata + + """ + + loader_name = AbstractAttribute("Name of the package manager") # e.g pypi + class_name = AbstractAttribute("Name of the loader class") # eg PyPILoader + + def __init__(self): + super().__init__(logging_class='swh.loader.%s.%s' % (self.loader_name, + self.class_name)) + self.TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.%s.' % self.loader_name + self.CONFIG_BASE_FILENAME = 'loader/%s' % self.loader_name + + self.ADDITIONAL_CONFIG = { + 'temp_directory': ('str', + '/tmp/swh.loader.%s/' % self.loader_name), + 'cache': ('bool', False), + 'cache_dir': ('str', ''), + 'debug': ('bool', False), # NOT FOR PRODUCTION + } + + self.local_cache = None + self.dir_path = None + + temp_directory = self.config['temp_directory'] + os.makedirs(temp_directory, exist_ok=True) + + self.temp_directory = mkdtemp( + suffix='-%s' % os.getpid(), + prefix=self.TEMPORARY_DIR_PREFIX_PATTERN, + dir=temp_directory) + + self.debug = self.config.get('debug', False) + + @abstractmethod + def convert_to_standard_format(): + """ + name: + origin_url: + tarballs:[ + { + url: must + }, + { + }, + ] + + No nature or + response in the case of if modified since + """ + pass + + def cleanup_artifact(): + """Clean up unnecessary files from the downloaded tarball + also some special operation if needed + """ + pass + + def extract_metadata(self): + """Fetch the metadata from the downloaded file + + """ + pass + + # You probably don't need to override anything below this line. + + def prepare_origin_visit(self, **kwargs): + """ + Prepare npm package visit. + + Args: + package_name (str): the name of the npm package + package_url (str): the url of the package description + package_metadata_url (str): the url for the package JSON metadata + + """ + # reset statuses + self._load_status = 'uneventful' + self._visit_status = 'full' + self.done = False + # fetch the npm package metadata from the registry + self.package_details = self.convert_to_standard_format(**kwargs) + self.origin = { + 'url': self.package_details['origin_url'], + 'type': self.loader_name, + } + self.visit_date = None # loader core will populate it + + def prepare(self, **kwargs): + """ + Prepare effective loading of source tarballs for a package manager + package. + + Args: + package_name (str): the name of the npm package + package_url (str): the url of the package description + package_metadata_url (str): the url for the package JSON metadata + """ + + self.contents = [] + self.directories = [] + self.revisions = [] + self.package_temp_dir = os.path.join(self.temp_directory, + self.package_details['name']) + + last_snapshot = self.last_snapshot() + self.known_artifacts = self.known_artifacts(last_snapshot) + + self.new_artifacts = \ + self.prepare_package_versions(self.package_details['tarballs'], + self.known_artifacts) + + def _prepare_package_version(self, package_source_data, tarball_request): + """ + Return + a dict of all the info + """ + url = package_source_data['url'] + tarball_path, hashes = self.generate_hash(tarball_request, url) + uncompressed_path = os.path.join(self.package_temp_dir, 'uncompressed', + url) # SEE ME + package_source_data['nature'] = self.uncompress_tarball( + tarball_path, uncompressed_path) + + # remove tarball + os.remove(tarball_path) + + if self.tarball_invalid: + return None, None + + package_path = self.cleanup_artifact(uncompressed_path) # also some + # special operation if needed + package_source_data = self.extract_metadata(package_path, + package_source_data) + return package_source_data, package_path + + def fetch_data(self): + """Called once per release artifact version (can be many for one + release). + + This will for each call: + - retrieve a release artifact (associated to a release version) + - Uncompress it and compute the necessary information + - Computes the swh objects + + Returns: + True as long as data to fetch exist + + """ + data = None + if self.done: + return False + + try: + data = next(self.new_artifacts) + self._load_status = 'eventful' + except StopIteration: + self.done = True + return False + + package_source_data, dir_path = data + # package release tarball was corrupted + if self.tarball_invalid: + return not self.done + + dir_path = dir_path.encode('utf-8') + directory = Directory.from_disk(path=dir_path, data=True) + objects = directory.collect() + + if 'content' not in objects: + objects['content'] = {} + if 'directory' not in objects: + objects['directory'] = {} + + self.contents = objects['content'].values() + self.directories = objects['directory'].values() + + ''' + useless + date = normalize_timestamp( + int(arrow.get(artifact['date']).timestamp)) + + name = release['name'].encode('utf-8') + message = release['message'].encode('utf-8') + if message: + message = b'%s: %s' % (name, message) + else: + message = name + ''' + filepath = [] # FIX ME + nature = [] + hashes = [] + revision = self.build_revision(filepath, nature, hashes) # FIX ME + + ''' + revision = { + 'synthetic': True, # ok + 'metadata': { + 'original_artifact': artifact, + 'project': project_info, + }, + 'author': author, # ok + 'date': date, # ok + 'committer': author, # ok + 'committer_date': date, # ok + 'message': message, # ok + 'directory': directory.hash, + 'parents': [], # why is this needed + 'type': 'tar', # ok + } + ''' + + revision['id'] = identifier_to_bytes( + revision_identifier(revision)) + self.revisions.append(revision) + + # Change me to be compatable with if modified since + package_key = package_source_data[self.compare_field] # check for this + self.known_artifacts[package_key] = revision['id'] # SEE ME + + self.log.debug('Removing unpacked package files at %s', dir_path) + shutil.rmtree(dir_path) + + return not self.done + + def build_revision(self, filepath, nature, hashes): + """Build the revision with identifier + + We use the `last_modified` date provided by the caller to + build the revision. + + """ + return set_original_artifact( + revision=compute_revision(filepath, self.last_modified), + filepath=filepath, + nature=nature, + hashes=hashes, + ) + + def last_snapshot(self): + """Retrieve the last snapshot of the package if any. + + """ + # Done + visit = self.storage.origin_visit_get_latest( + self.origin['url'], require_snapshot=True) + if visit: + return snapshot_get_all_branches(self.storage, visit['snapshot']) + + def store_data(self): + """Store fetched data in the database. + + """ + # Done + self.maybe_load_contents(self.contents) + self.maybe_load_directories(self.directories) + self.maybe_load_revisions(self.revisions) + + if self.done: + self.generate_and_load_snapshot() + self.flush() + + def generate_and_load_snapshot(self): + """ + Make me + """ + pass + + def generate_hash(self, response, url): + """Store file in temp directory and computes hash of its filepath + + Args: + response (Response): Server response of the url + url (str): Url of the tarball + + Returns: + Tuple of local (filepath, hashes of filepath) + + """ + # Done Update docstring + length = int(response.headers['content-length']) + + # SEE ME + filepath = os.path.join(self.package_temp_dir, os.path.basename(url)) + + h = MultiHash(length=length) + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): + h.update(chunk) + f.write(chunk) + + actual_length = os.path.getsize(filepath) + if length != actual_length: + raise ValueError('Error when checking size: %s != %s' % ( + length, actual_length)) + + hashes = { + 'length': length, + **h.hexdigest() + } + return filepath, hashes + + def uncompress_tarball(self, filepath, path): + """Uncompress a tarball + + Args: + filepath (str): Path of tarball to uncompress + path (str): The destination folder where to uncompress the tarball + Returns: + The nature of the tarball, zip or tar. + + """ + # Done + # filepath = tempdir + url + try: + self.tarball_invalid = False + return tarball.uncompress(filepath, path) + except Exception: + self.tarball_invalid = True + return None + + def pre_cleanup(self): + """To prevent disk explosion if some other workers exploded + in mid-air (OOM killed), we try and clean up dangling files. + + """ + # Done + if self.debug: + self.log.warn('%s Will not pre-clean up temp dir %s' % ( + DEBUG_MODE, self.temp_directory + )) + return + clean_dangling_folders(self.config['temp_directory'], + pattern_check=self.TEMPORARY_DIR_PREFIX_PATTERN, + log=self.log) + + def flush(self): + """Flush any potential dangling data not sent to swh-storage. + + Bypass the maybe_load_* methods which awaits threshold reached + signal. We actually want to store those as we are done + loading. + + """ + # Done + contents = self.contents.pop() + directories = self.directories.pop() + revisions = self.revisions.pop() + releases = self.releases.pop() + + # and send those to storage if asked + if self.config['send_contents']: + self.send_batch_contents(contents) + if self.config['send_contents']: + self.send_batch_directories(directories) + if self.config['send_revisions']: + self.send_batch_revisions(revisions) + if self.config['send_releases']: + self.send_batch_releases(releases) + if self.config['send_snapshot'] and self.snapshot: + self.send_snapshot(self.snapshot) + + def cleanup(self): + """Clean up temporary disk use after downloading and extracting + package tarballs. + + """ + # Done + if self.debug: + self.log.warn('%s Will not clean up temp dir %s' % ( + DEBUG_MODE, self.temp_directory + )) + return + if os.path.exists(self.temp_directory): + self.log.debug('Clean up %s' % self.temp_directory) + shutil.rmtree(self.temp_directory) diff --git a/swh/loader/base/tests/__init__.py b/swh/loader/base/tests/__init__.py new file mode 100644 diff --git a/swh/loader/base/tests/test_download.py b/swh/loader/base/tests/test_download.py new file mode 100644 diff --git a/swh/loader/base/tests/test_loader.py b/swh/loader/base/tests/test_loader.py new file mode 100644