Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tar/loader.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import tempfile | import tempfile | ||||
import requests | |||||
import shutil | import shutil | ||||
from tempfile import mkdtemp | |||||
from swh.core import tarball | from swh.core import tarball | ||||
from swh.loader.core.loader import BufferedLoader | from swh.loader.core.loader import BufferedLoader | ||||
from swh.loader.dir import loader | from swh.loader.dir.loader import revision_from, snapshot_from | ||||
from swh.model.hashutil import MultiHash | from swh.model.hashutil import MultiHash | ||||
from swh.model.from_disk import Directory | |||||
from .build import compute_revision | |||||
try: | |||||
from _version import __version__ | |||||
except ImportError: | |||||
__version__ = 'devel' | |||||
TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.tar.' | |||||
DEBUG_MODE = '** DEBUG MODE **' | |||||
class LocalResponse: | |||||
"""Local Response class with iter_content api | |||||
""" | |||||
def __init__(self, path): | |||||
self.path = path | |||||
def iter_content(self, chunk_size=None): | |||||
with open(self.path, 'rb') as f: | |||||
for chunk in f: | |||||
yield chunk | |||||
class ArchiveFetcher: | |||||
"""Http/Local client in charge of downloading archives from a | |||||
remote/local server. | |||||
Args: | |||||
temp_directory (str): Path to the temporary disk location used | |||||
for downloading the release artifacts | |||||
""" | |||||
def __init__(self, temp_directory=None): | |||||
self.temp_directory = temp_directory | |||||
self.session = requests.session() | |||||
self.params = { | |||||
'headers': { | |||||
'User-Agent': 'Software Heritage Tar Loader (%s)' % ( | |||||
__version__ | |||||
) | |||||
} | |||||
} | |||||
def download(self, url): | |||||
"""Download the remote tarball url locally. | |||||
Args: | |||||
url (str): Url (file or http*) | |||||
Raises: | |||||
ValueError in case of failing to query | |||||
Returns: | |||||
Tuple of local (filepath, hashes of filepath) | |||||
class TarLoader(loader.DirLoader): | """ | ||||
if url.startswith('file://'): | |||||
# FIXME: How to improve this | |||||
path = url.strip('file:').replace('///', '/') | |||||
response = LocalResponse(path) | |||||
length = os.path.getsize(path) | |||||
else: | |||||
response = self.session.get(url, **self.params, stream=True) | |||||
if response.status_code != 200: | |||||
raise ValueError("Fail to query '%s'. Reason: %s" % ( | |||||
url, response.status_code)) | |||||
length = int(response.headers['content-length']) | |||||
filepath = os.path.join(self.temp_directory, os.path.basename(url)) | |||||
h = MultiHash(length=length) | |||||
with open(filepath, 'wb') as f: | |||||
for chunk in response.iter_content(chunk_size=None): | |||||
h.update(chunk) | |||||
f.write(chunk) | |||||
actual_length = os.path.getsize(filepath) | |||||
if length != actual_length: | |||||
raise ValueError('Error when checking size: %s != %s' % ( | |||||
length, actual_length)) | |||||
hashes = { | |||||
'length': length, | |||||
**h.hexdigest() | |||||
} | |||||
return filepath, hashes | |||||
class TarLoader(BufferedLoader): | |||||
"""Tarball loader implementation. | """Tarball loader implementation. | ||||
This is a subclass of the :class:DirLoader as the main goal of | This is a subclass of the :class:DirLoader as the main goal of | ||||
this class is to first uncompress a tarball, then provide the | this class is to first uncompress a tarball, then provide the | ||||
uncompressed directory/tree to be loaded by the DirLoader. | uncompressed directory/tree to be loaded by the DirLoader. | ||||
This will: | This will: | ||||
- creates an origin (if it does not exist) | - creates an origin (if it does not exist) | ||||
- creates a fetch_history entry | - creates a fetch_history entry | ||||
- creates an origin_visit | - creates an origin_visit | ||||
- uncompress locally the tarball in a temporary location | - uncompress locally the tarball in a temporary location | ||||
- process the content of the tarballs to persist on swh storage | - process the content of the tarballs to persist on swh storage | ||||
- clean up the temporary location | - clean up the temporary location | ||||
- write an entry in fetch_history to mark the loading tarball end (success | - write an entry in fetch_history to mark the loading tarball end (success | ||||
or failure) | or failure) | ||||
""" | """ | ||||
CONFIG_BASE_FILENAME = 'loader/tar' | CONFIG_BASE_FILENAME = 'loader/tar' | ||||
ADDITIONAL_CONFIG = { | ADDITIONAL_CONFIG = { | ||||
'extraction_dir': ('string', '/tmp') | 'working_dir': ('string', '/tmp'), | ||||
'debug': ('bool', False), # NOT FOR PRODUCTION | |||||
} | } | ||||
def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): | def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): | ||||
super().__init__(logging_class=logging_class, config=config) | super().__init__(logging_class=logging_class, config=config) | ||||
self.local_cache = None | |||||
self.dir_path = None | self.dir_path = None | ||||
working_dir = self.config['working_dir'] | |||||
os.makedirs(working_dir, exist_ok=True) | |||||
self.temp_directory = mkdtemp( | |||||
suffix='-%s' % os.getpid(), | |||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | |||||
dir=working_dir) | |||||
self.client = ArchiveFetcher(temp_directory=self.temp_directory) | |||||
os.makedirs(working_dir, 0o755, exist_ok=True) | |||||
self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', | |||||
dir=self.temp_directory) | |||||
self.debug = self.config['debug'] | |||||
def load(self, *, tar_path, origin, visit_date, revision, | def cleanup(self): | ||||
branch_name=None): | """Clean up temporary disk folders used. | ||||
"""Load a tarball in `tarpath` in the Software Heritage Archive. | |||||
Args: | |||||
tar_path: tarball to import | |||||
origin (dict): an origin dictionary as returned by | |||||
:func:`swh.storage.storage.Storage.origin_get_one` | |||||
visit_date (str): the date the origin was visited (as an | |||||
isoformatted string) | |||||
revision (dict): a revision as passed to | |||||
:func:`swh.storage.storage.Storage.revision_add`, excluding the | |||||
`id` and `directory` keys (computed from the directory) | |||||
branch_name (str): the optional branch_name to use for snapshot | |||||
""" | """ | ||||
# Shortcut super() as we use different arguments than the DirLoader. | if self.debug: | ||||
return BufferedLoader.load(self, tar_path=tar_path, origin=origin, | self.log.warn('%s Will not clean up temp dir %s' % ( | ||||
visit_date=visit_date, revision=revision, | DEBUG_MODE, self.temp_directory | ||||
vlorentz: Why not `self.log.debug(...)`? | |||||
Done Inline Actionsbecause i want the user to be warned it will mess up it's local environment for debug reasons. ardumont: because i want the user to be warned it will mess up it's local environment for debug reasons. | |||||
branch_name=branch_name) | )) | ||||
return | |||||
if os.path.exists(self.temp_directory): | |||||
self.log.debug('Clean up %s' % self.temp_directory) | |||||
shutil.rmtree(self.temp_directory) | |||||
def prepare_origin_visit(self, *, origin, visit_date=None, **kwargs): | def prepare_origin_visit(self, *, origin, visit_date=None, **kwargs): | ||||
self.origin = origin | self.origin = origin | ||||
if 'type' not in self.origin: # let the type flow if present | if 'type' not in self.origin: # let the type flow if present | ||||
self.origin['type'] = 'tar' | self.origin['type'] = 'tar' | ||||
Not Done Inline Actionswith urllib.parse vlorentz: with `urllib.parse` | |||||
Done Inline ActionsThank you, will look into it. ardumont: Thank you, will look into it. | |||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
def prepare(self, *, tar_path, origin, revision, visit_date=None, | def prepare(self, *args, **kwargs): | ||||
Done Inline ActionsThat will need to be provided by the caller (so the lister here). ardumont: That will need to be provided by the caller (so the lister here). | |||||
Done Inline ActionsMaybe rename the method to get_tarball_url_to_retrieve, to make it obvious it returns the value (rather than setting an object attribute) vlorentz: Maybe rename the method to `get_tarball_url_to_retrieve`, to make it obvious it returns the… | |||||
Done Inline ActionsRight ardumont: Right | |||||
branch_name=None): | """last_modified is the time of last modification of the tarball. | ||||
"""1. Uncompress the tarball in a temporary directory. | |||||
2. Compute some metadata to update the revision. | |||||
""" | |||||
# Prepare the extraction path | |||||
extraction_dir = self.config['extraction_dir'] | |||||
os.makedirs(extraction_dir, 0o755, exist_ok=True) | |||||
self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', | |||||
dir=extraction_dir) | |||||
# add checksums in revision | E.g https://ftp.gnu.org/gnu/8sync/: | ||||
[ ] 8sync-0.1.0.tar.gz 2016-04-22 16:35 217K | |||||
[ ] 8sync-0.1.0.tar.gz.sig 2016-04-22 16:35 543 | |||||
self.log.info('Uncompress %s to %s' % (tar_path, self.dir_path)) | """ | ||||
nature = tarball.uncompress(tar_path, self.dir_path) | self.last_modified = kwargs.get('last_modified') | ||||
Not Done Inline Actionsraise NotImplementedError() (or make the class abstract) vlorentz: `raise NotImplementedError()` (or make the class abstract) | |||||
Done Inline ActionsRight, i forgot! ardumont: Right, i forgot! | |||||
if 'metadata' not in revision: | |||||
artifact = MultiHash.from_path(tar_path).hexdigest() | |||||
artifact['name'] = os.path.basename(tar_path) | |||||
artifact['archive_type'] = nature | |||||
artifact['length'] = os.path.getsize(tar_path) | |||||
revision['metadata'] = { | |||||
'original_artifact': [artifact], | |||||
} | |||||
branch = branch_name if branch_name else os.path.basename(tar_path) | |||||
super().prepare(dir_path=self.dir_path, | |||||
origin=origin, | |||||
visit_date=visit_date, | |||||
revision=revision, | |||||
release=None, | |||||
branch_name=branch) | |||||
def cleanup(self): | def fetch_data(self): | ||||
"""Clean up temporary directory where we uncompress the tarball. | """Retrieve and uncompress the archive. | ||||
""" | """ | ||||
if self.dir_path and os.path.exists(self.dir_path): | # fetch the remote tarball locally | ||||
shutil.rmtree(self.dir_path) | url = self.origin['url'] | ||||
filepath, hashes = self.client.download(url) | |||||
# add checksums in revision | |||||
self.log.info('Uncompress %s to %s' % (filepath, self.dir_path)) | |||||
nature = tarball.uncompress(filepath, self.dir_path) | |||||
if __name__ == '__main__': | dir_path = self.dir_path.encode('utf-8') | ||||
import click | directory = Directory.from_disk(path=dir_path, save_path=True) | ||||
import logging | objects = directory.collect() | ||||
logging.basicConfig( | if 'content' not in objects: | ||||
level=logging.DEBUG, | objects['content'] = {} | ||||
format='%(asctime)s %(process)d %(message)s' | if 'directory' not in objects: | ||||
) | objects['directory'] = {} | ||||
@click.command() | # compute the full revision (with ids) | ||||
@click.option('--archive-path', required=1, help='Archive path to load') | |||||
@click.option('--origin-url', required=1, help='Origin url to associate') | |||||
@click.option('--visit-date', default=None, | |||||
help='Visit date time override') | |||||
def main(archive_path, origin_url, visit_date): | |||||
"""Loading archive tryout.""" | |||||
import datetime | |||||
origin = {'url': origin_url, 'type': 'tar'} | |||||
commit_time = int(datetime.datetime.now( | |||||
tz=datetime.timezone.utc).timestamp()) | |||||
swh_person = { | |||||
'name': 'Software Heritage', | |||||
'fullname': 'Software Heritage', | |||||
'email': 'robot@softwareheritage.org' | |||||
} | |||||
revision = { | revision = { | ||||
'date': {'timestamp': commit_time, 'offset': 0}, | **compute_revision(filepath, self.last_modified), | ||||
'committer_date': {'timestamp': commit_time, 'offset': 0}, | 'metadata': { | ||||
'author': swh_person, | 'original_artifact': [{ | ||||
'committer': swh_person, | 'name': os.path.basename(filepath), | ||||
'type': 'tar', | 'archive_type': nature, | ||||
'message': 'swh-loader-tar: synthetic revision message', | **hashes, | ||||
'metadata': {}, | }], | ||||
'synthetic': True, | } | ||||
} | } | ||||
TarLoader().load(tar_path=archive_path, origin=origin, | revision = revision_from(directory.hash, revision) | ||||
visit_date=visit_date, revision=revision, | objects['revision'] = { | ||||
branch_name='master') | revision['id']: revision, | ||||
} | |||||
branch_name = os.path.basename(self.dir_path) | |||||
snapshot = snapshot_from(revision['id'], branch_name) | |||||
objects['snapshot'] = { | |||||
snapshot['id']: snapshot | |||||
Not Done Inline Actionsrename it to LocalTarLoader? vlorentz: rename it to `LocalTarLoader`? | |||||
Done Inline ActionsI wanted to avoid having to migrate the loader deposit. ardumont: I wanted to avoid having to migrate the loader deposit.
I could do as you did with aliasing if… | |||||
} | |||||
self.objects = objects | |||||
main() | def store_data(self): | ||||
objects = self.objects | |||||
self.maybe_load_contents(objects['content'].values()) | |||||
Not Done Inline Actionss/tarballs/tarball/ vlorentz: s/tarballs/tarball/ | |||||
Done Inline Actionsindeed. ardumont: indeed. | |||||
self.maybe_load_directories(objects['directory'].values()) | |||||
self.maybe_load_revisions(objects['revision'].values()) | |||||
snapshot = list(objects['snapshot'].values())[0] | |||||
self.maybe_load_snapshot(snapshot) |
Why not self.log.debug(...)?