Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import logging | |||||
import tempfile | import tempfile | ||||
import os | import os | ||||
from typing import Generator, Dict, Tuple, Sequence | from typing import Generator, Dict, Tuple, Sequence | ||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
revision_identifier, snapshot_identifier, identifier_to_bytes | revision_identifier, snapshot_identifier, identifier_to_bytes | ||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.loader.core.converters import content_for_storage | from swh.loader.core.converters import content_for_storage | ||||
logger = logging.getLogger(__name__) | |||||
# Not implemented yet: | # Not implemented yet: | ||||
# - clean up disk routines from previous killed workers (when OOMkilled) | # - clean up disk routines from previous killed workers (when OOMkilled) | ||||
# -> separation of concern would like this to be abstracted from the code | # -> separation of concern would like this to be abstracted from the code | ||||
# -> experience tells us it's complicated to do as such (T903, T964, T982, | # -> experience tells us it's complicated to do as such (T903, T964, T982, | ||||
# etc...) | # etc...) | ||||
# | # | ||||
# - splitting into groups too many objects sent to storage > could be a > | # - splitting into groups too many objects sent to storage > could be a > | ||||
# -> specialized collaborator or storage implementation or proxy which deals | # -> specialized collaborator or storage implementation or proxy which deals | ||||
▲ Show 20 Lines • Show All 132 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
information from step 0., generate a snapshot and load it into the | information from step 0., generate a snapshot and load it into the | ||||
Software Heritage archive | Software Heritage archive | ||||
""" | """ | ||||
status_load = 'uneventful' # either: eventful, uneventful, failed | status_load = 'uneventful' # either: eventful, uneventful, failed | ||||
status_visit = 'partial' # either: partial, full | status_visit = 'partial' # either: partial, full | ||||
tmp_revisions = {} | tmp_revisions = {} | ||||
try: | |||||
# Prepare origin and origin_visit | # Prepare origin and origin_visit | ||||
origin = {'url': self.url} | origin = {'url': self.url} | ||||
self.storage.origin_add([origin]) | self.storage.origin_add([origin]) | ||||
visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
visit_id = self.storage.origin_visit_add( | visit_id = self.storage.origin_visit_add( | ||||
origin=self.url, | origin=self.url, | ||||
date=visit_date, | date=visit_date, | ||||
type=self.visit_type)['visit'] | type=self.visit_type)['visit'] | ||||
# Retrieve the default release (the "latest" one) | # Retrieve the default release (the "latest" one) | ||||
default_release = self.get_default_release() | default_release = self.get_default_release() | ||||
# FIXME: Add load exceptions handling | # FIXME: Add load exceptions handling | ||||
for version in self.get_versions(): # for each | for version in self.get_versions(): # for each | ||||
tmp_revisions[version] = [] | tmp_revisions[version] = [] | ||||
# `a_` stands for `artifact_` | # `a_` stands for `artifact_` | ||||
for a_filename, a_uri, a_metadata in self.get_artifacts(version): | for a_filename, a_uri, a_metadata in self.get_artifacts( | ||||
version): | |||||
with tempfile.TemporaryDirectory() as tmpdir: | with tempfile.TemporaryDirectory() as tmpdir: | ||||
a_path, a_computed_metadata = self.fetch_artifact_archive( | # a_c_: archive_computed_ | ||||
a_path, a_c_metadata = self.fetch_artifact_archive( | |||||
a_uri, dest=tmpdir) | a_uri, dest=tmpdir) | ||||
uncompressed_path = os.path.join(tmpdir, 'src') | uncompressed_path = os.path.join(tmpdir, 'src') | ||||
uncompress(a_path, dest=uncompressed_path) | uncompress(a_path, dest=uncompressed_path) | ||||
directory = Directory.from_disk( | directory = Directory.from_disk( | ||||
path=uncompressed_path.encode('utf-8'), data=True) | path=uncompressed_path.encode('utf-8'), data=True) | ||||
# FIXME: Try not to load the full raw content in memory | # FIXME: Try not to load the full raw content in memory | ||||
objects = directory.collect() | objects = directory.collect() | ||||
contents = objects['content'].values() | contents = objects['content'].values() | ||||
self.storage.content_add( | self.storage.content_add( | ||||
map(content_for_storage, contents)) | map(content_for_storage, contents)) | ||||
status_load = 'eventful' | status_load = 'eventful' | ||||
directories = objects['directory'].values() | directories = objects['directory'].values() | ||||
self.storage.directory_add(directories) | self.storage.directory_add(directories) | ||||
# FIXME: This should be release. cf. D409 discussion | # FIXME: This should be release. cf. D409 discussion | ||||
revision = self.build_revision( | revision = self.build_revision( | ||||
a_metadata, uncompressed_path) | a_metadata, uncompressed_path) | ||||
revision.update({ | revision.update({ | ||||
'type': 'tar', | 'type': 'tar', | ||||
'synthetic': True, | 'synthetic': True, | ||||
'directory': directory.hash, | 'directory': directory.hash, | ||||
}) | }) | ||||
revision['metadata'].update({ | revision['metadata'].update({ | ||||
'original_artifact': a_metadata, | 'original_artifact': a_metadata, | ||||
'hashes_artifact': a_computed_metadata | 'hashes_artifact': a_c_metadata | ||||
}) | }) | ||||
revision['id'] = identifier_to_bytes( | revision['id'] = identifier_to_bytes( | ||||
revision_identifier(revision)) | revision_identifier(revision)) | ||||
self.storage.revision_add([revision]) | self.storage.revision_add([revision]) | ||||
tmp_revisions[version].append({ | tmp_revisions[version].append({ | ||||
'filename': a_filename, | 'filename': a_filename, | ||||
'target': revision['id'], | 'target': revision['id'], | ||||
}) | }) | ||||
# Build and load the snapshot | # Build and load the snapshot | ||||
branches = {} | branches = {} | ||||
for version, v_branches in tmp_revisions.items(): | for version, v_branches in tmp_revisions.items(): | ||||
if len(v_branches) == 1: | if len(v_branches) == 1: | ||||
branch_name = ('releases/%s' % version).encode('utf-8') | branch_name = ('releases/%s' % version).encode('utf-8') | ||||
if version == default_release: | if version == default_release: | ||||
branches[b'HEAD'] = { | branches[b'HEAD'] = { | ||||
'target_type': 'alias', | 'target_type': 'alias', | ||||
'target': branch_name, | 'target': branch_name, | ||||
} | } | ||||
branches[branch_name] = { | branches[branch_name] = { | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
'target': v_branches[0]['target'], | 'target': v_branches[0]['target'], | ||||
} | } | ||||
else: | else: | ||||
for x in v_branches: | for x in v_branches: | ||||
branch_name = ('releases/%s/%s' % ( | branch_name = ('releases/%s/%s' % ( | ||||
version, v_branches['filename'])).encode('utf-8') | version, v_branches['filename'])).encode('utf-8') | ||||
branches[branch_name] = { | branches[branch_name] = { | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
'target': x['target'], | 'target': x['target'], | ||||
} | } | ||||
snapshot = { | snapshot = { | ||||
'branches': branches | 'branches': branches | ||||
} | } | ||||
snapshot['id'] = identifier_to_bytes( | snapshot['id'] = identifier_to_bytes( | ||||
snapshot_identifier(snapshot)) | snapshot_identifier(snapshot)) | ||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||
# come so far, we actually reached a full visit | # come so far, we actually reached a full visit | ||||
status_visit = 'full' | status_visit = 'full' | ||||
# Update the visit's state | # Update the visit's state | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
origin=self.url, visit_id=visit_id, status=status_visit, | origin=self.url, visit_id=visit_id, status=status_visit, | ||||
snapshot=snapshot) | snapshot=snapshot) | ||||
except ValueError as e: | |||||
logger.warning('Fail to load %s. Reason: %s' % (self.url, e)) | |||||
finally: | |||||
return {'status': status_load} | return {'status': status_load} |