Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
Show All 13 Lines | |||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
revision_identifier, snapshot_identifier, identifier_to_bytes | revision_identifier, snapshot_identifier, identifier_to_bytes | ||||
) | ) | ||||
from swh.model.model import Sha1Git | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from swh.loader.core.converters import prepare_contents | from swh.loader.core.converters import prepare_contents | ||||
from swh.loader.package.utils import download | from swh.loader.package.utils import download | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Not implemented yet: | # Not implemented yet: | ||||
▲ Show 20 Lines • Show All 255 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
logger.debug('version: %s', version) | logger.debug('version: %s', version) | ||||
tmp_revisions[version] = [] | tmp_revisions[version] = [] | ||||
# `p_` stands for `package_` | # `p_` stands for `package_` | ||||
for branch_name, p_info in self.get_package_info(version): | for branch_name, p_info in self.get_package_info(version): | ||||
logger.debug('package_info: %s', p_info) | logger.debug('package_info: %s', p_info) | ||||
revision_id = self.resolve_revision_from( | revision_id = self.resolve_revision_from( | ||||
known_artifacts, p_info['raw']) | known_artifacts, p_info['raw']) | ||||
if revision_id is None: | if revision_id is None: | ||||
with tempfile.TemporaryDirectory() as tmpdir: | (revision_id, loaded) = \ | ||||
try: | self._load_revision(p_info, origin) | ||||
dl_artifacts = self.download_package( | if loaded: | ||||
p_info, tmpdir) | |||||
except Exception: | |||||
logger.exception('Unable to retrieve %s', | |||||
p_info) | |||||
status_visit = 'partial' | |||||
continue | |||||
uncompressed_path = self.uncompress( | |||||
dl_artifacts, dest=tmpdir) | |||||
logger.debug('uncompressed_path: %s', | |||||
uncompressed_path) | |||||
directory = Directory.from_disk( | |||||
path=uncompressed_path.encode('utf-8'), | |||||
data=True) # noqa | |||||
# FIXME: Try not to load the full raw content in | |||||
# memory | |||||
objects = directory.collect() | |||||
contents, skipped_contents = prepare_contents( | |||||
objects.get('content', {}).values(), | |||||
max_content_size=self.max_content_size, | |||||
origin_url=origin['url']) | |||||
self.storage.skipped_content_add(skipped_contents) | |||||
logger.debug('Number of skipped contents: %s', | |||||
len(skipped_contents)) | |||||
self.storage.content_add(contents) | |||||
logger.debug('Number of contents: %s', | |||||
len(contents)) | |||||
status_load = 'eventful' | status_load = 'eventful' | ||||
else: | |||||
directories = list( | status_visit = 'partial' | ||||
objects.get('directory', {}).values()) | if revision_id is None: | ||||
logger.debug('Number of directories: %s', | |||||
len(directories)) | |||||
self.storage.directory_add(directories) | |||||
# FIXME: This should be release. cf. D409 | |||||
revision = self.build_revision( | |||||
p_info['raw'], uncompressed_path) | |||||
if not revision: | |||||
# Some artifacts are missing intrinsic metadata | |||||
# skipping those | |||||
continue | continue | ||||
revision.update({ | |||||
'synthetic': True, | |||||
'directory': directory.hash, | |||||
}) | |||||
revision['metadata'].update({ | |||||
'original_artifact': [ | |||||
hashes for _, hashes in dl_artifacts | |||||
], | |||||
}) | |||||
revision['id'] = revision_id = identifier_to_bytes( | |||||
revision_identifier(revision)) | |||||
logger.debug('Revision: %s', revision) | |||||
self.storage.revision_add([revision]) | |||||
tmp_revisions[version].append((branch_name, revision_id)) | tmp_revisions[version].append((branch_name, revision_id)) | ||||
logger.debug('tmp_revisions: %s', tmp_revisions) | logger.debug('tmp_revisions: %s', tmp_revisions) | ||||
# Build and load the snapshot | # Build and load the snapshot | ||||
branches = {} # type: Dict[bytes, Mapping[str, Any]] | branches = {} # type: Dict[bytes, Mapping[str, Any]] | ||||
for version, branch_name_revisions in tmp_revisions.items(): | for version, branch_name_revisions in tmp_revisions.items(): | ||||
if version == default_version and \ | if version == default_version and \ | ||||
len(branch_name_revisions) == 1: | len(branch_name_revisions) == 1: | ||||
Show All 34 Lines | def load(self) -> Dict: | ||||
origin=self.url, visit_id=visit_id, status=status_visit, | origin=self.url, visit_id=visit_id, status=status_visit, | ||||
snapshot=snapshot and snapshot['id']) | snapshot=snapshot and snapshot['id']) | ||||
result = { | result = { | ||||
'status': status_load, | 'status': status_load, | ||||
} # type: Dict[str, Any] | } # type: Dict[str, Any] | ||||
if snapshot: | if snapshot: | ||||
result['snapshot_id'] = hash_to_hex(snapshot['id']) | result['snapshot_id'] = hash_to_hex(snapshot['id']) | ||||
return result | return result | ||||
def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | |||||
"""Does all the loading of a revision itself: | |||||
* downloads a package and uncompresses it | |||||
* loads it from disk | |||||
* adds contents, directories, and revision to self.storage | |||||
* returns (revision_id, loaded) | |||||
""" | |||||
with tempfile.TemporaryDirectory() as tmpdir: | |||||
try: | |||||
dl_artifacts = self.download_package(p_info, tmpdir) | |||||
except Exception: | |||||
logger.exception('Unable to retrieve %s', | |||||
p_info) | |||||
return (None, False) | |||||
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | |||||
logger.debug('uncompressed_path: %s', uncompressed_path) | |||||
directory = Directory.from_disk( | |||||
path=uncompressed_path.encode('utf-8'), | |||||
data=True) # noqa | |||||
# FIXME: Try not to load the full raw content in | |||||
# memory | |||||
objects = directory.collect() | |||||
contents, skipped_contents = prepare_contents( | |||||
objects.get('content', {}).values(), | |||||
max_content_size=self.max_content_size, | |||||
origin_url=origin['url']) | |||||
self.storage.skipped_content_add(skipped_contents) | |||||
logger.debug('Number of skipped contents: %s', | |||||
len(skipped_contents)) | |||||
self.storage.content_add(contents) | |||||
logger.debug('Number of contents: %s', len(contents)) | |||||
directories = list( | |||||
objects.get('directory', {}).values()) | |||||
logger.debug('Number of directories: %s', len(directories)) | |||||
self.storage.directory_add(directories) | |||||
# FIXME: This should be release. cf. D409 | |||||
revision = self.build_revision(p_info['raw'], uncompressed_path) | |||||
if not revision: | |||||
# Some artifacts are missing intrinsic metadata | |||||
# skipping those | |||||
return (None, True) | |||||
revision.update({ | |||||
'synthetic': True, | |||||
'directory': directory.hash, | |||||
}) | |||||
revision['metadata'].update({ | |||||
'original_artifact': [ | |||||
hashes for _, hashes in dl_artifacts | |||||
], | |||||
}) | |||||
revision['id'] = identifier_to_bytes( | |||||
revision_identifier(revision)) | |||||
logger.debug('Revision: %s', revision) | |||||
self.storage.revision_add([revision]) | |||||
return (revision['id'], True) |