Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import logging | import logging | ||||
import tempfile | import tempfile | ||||
import os | import os | ||||
from typing import ( | from typing import ( | ||||
Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple | Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple | ||||
) | ) | ||||
import attr | |||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.model.from_disk import Directory | from swh.model import from_disk | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.identifiers import ( | from swh.model.model import ( | ||||
revision_identifier, snapshot_identifier, identifier_to_bytes | BaseModel, Sha1Git, | ||||
Content, SkippedContent, Directory, | |||||
Revision, | |||||
TargetType, Snapshot, | |||||
Origin | |||||
) | ) | ||||
from swh.model.model import Sha1Git | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from swh.loader.core.converters import prepare_contents | |||||
from swh.loader.package.utils import download | from swh.loader.package.utils import download | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Not implemented yet: | # Not implemented yet: | ||||
# - clean up disk routines from previous killed workers (when OOMkilled) | # - clean up disk routines from previous killed workers (when OOMkilled) | ||||
# -> separation of concern would like this to be abstracted from the code | # -> separation of concern would like this to be abstracted from the code | ||||
# -> experience tells us it's complicated to do as such (T903, T964, T982, | # -> experience tells us it's complicated to do as such (T903, T964, T982, | ||||
# etc...) | # etc...) | ||||
# | # | ||||
# - model: swh.model.merkle.from_disk should output swh.model.model.* objects | # - model: swh.model.merkle.from_disk should output swh.model.model.* objects | ||||
# to avoid this layer's conversion routine call | # to avoid this layer's conversion routine call | ||||
# -> Take this up within swh.model's current implementation | # -> Take this up within swh.model's current implementation | ||||
ardumont: Those 3 lines can be dropped as, that's taking care of here ;) | |||||
class PackageLoader: | class PackageLoader: | ||||
# Origin visit type (str) set by the loader | # Origin visit type (str) set by the loader | ||||
visit_type = '' | visit_type = '' | ||||
def __init__(self, url): | def __init__(self, url): | ||||
"""Loader's constructor. This raises exception if the minimal required | """Loader's constructor. This raises exception if the minimal required | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def get_package_info(self, version: str) -> Generator[ | ||||
Returns: | Returns: | ||||
(branch name, package metadata) | (branch name, package metadata) | ||||
""" | """ | ||||
yield from {} | yield from {} | ||||
def build_revision( | def build_revision( | ||||
self, a_metadata: Dict, uncompressed_path: str) -> Dict: | self, a_metadata: Dict, uncompressed_path: str, | ||||
"""Build the revision dict from the archive metadata (extrinsic | directory: Sha1Git) -> Optional[Revision]: | ||||
"""Build the revision from the archive metadata (extrinsic | |||||
artifact metadata) and the intrinsic metadata. | artifact metadata) and the intrinsic metadata. | ||||
Args: | Args: | ||||
a_metadata: Artifact metadata | a_metadata: Artifact metadata | ||||
uncompressed_path: Artifact uncompressed path on disk | uncompressed_path: Artifact uncompressed path on disk | ||||
Returns: | Returns: | ||||
SWH data dict | SWH data dict | ||||
""" | """ | ||||
return {} | raise NotImplementedError('build_revision') | ||||
def get_default_version(self) -> str: | def get_default_version(self) -> str: | ||||
"""Retrieve the latest release version if any. | """Retrieve the latest release version if any. | ||||
Returns: | Returns: | ||||
Latest version | Latest version | ||||
""" | """ | ||||
return '' | return '' | ||||
def last_snapshot(self) -> Optional[Dict]: | def last_snapshot(self) -> Optional[Snapshot]: | ||||
"""Retrieve the last snapshot | """Retrieve the last snapshot | ||||
""" | """ | ||||
snapshot = None | snapshot = None | ||||
visit = self.storage.origin_visit_get_latest( | visit = self.storage.origin_visit_get_latest( | ||||
self.url, require_snapshot=True) | self.url, require_snapshot=True) | ||||
if visit: | if visit and visit.get('snapshot'): | ||||
snapshot = snapshot_get_all_branches( | snapshot = Snapshot.from_dict(snapshot_get_all_branches( | ||||
self.storage, visit['snapshot']) | self.storage, visit['snapshot'])) | ||||
return snapshot | return snapshot | ||||
def known_artifacts(self, snapshot: Optional[Dict]) -> Dict: | def known_artifacts( | ||||
self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: | |||||
"""Retrieve the known releases/artifact for the origin. | """Retrieve the known releases/artifact for the origin. | ||||
Args | Args | ||||
snapshot: snapshot for the visit | snapshot: snapshot for the visit | ||||
Returns: | Returns: | ||||
Dict of keys revision id (bytes), values a metadata Dict. | Dict of keys revision id (bytes), values a metadata Dict. | ||||
""" | """ | ||||
if not snapshot or 'branches' not in snapshot: | if not snapshot: | ||||
return {} | return {} | ||||
# retrieve only revisions (e.g the alias we do not want here) | # retrieve only revisions (e.g the alias we do not want here) | ||||
revs = [rev['target'] | revs = [rev.target | ||||
for rev in snapshot['branches'].values() | for rev in snapshot.branches.values() | ||||
if rev and rev['target_type'] == 'revision'] | if rev and rev.target_type == TargetType.REVISION] | ||||
known_revisions = self.storage.revision_get(revs) | known_revisions = self.storage.revision_get(revs) | ||||
ret = {} | ret = {} | ||||
for revision in known_revisions: | for revision in known_revisions: | ||||
if not revision: # revision_get can return None | if not revision: # revision_get can return None | ||||
continue | continue | ||||
ret[revision['id']] = revision['metadata'] | ret[revision['id']] = revision['metadata'] | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
""" | """ | ||||
status_load = 'uneventful' # either: eventful, uneventful, failed | status_load = 'uneventful' # either: eventful, uneventful, failed | ||||
status_visit = 'full' # either: partial, full | status_visit = 'full' # either: partial, full | ||||
tmp_revisions = {} # type: Dict[str, List] | tmp_revisions = {} # type: Dict[str, List] | ||||
snapshot = None | snapshot = None | ||||
# Prepare origin and origin_visit | # Prepare origin and origin_visit | ||||
origin = {'url': self.url} | origin = Origin(url=self.url) | ||||
try: | try: | ||||
self.storage.origin_add_one(origin) | self.storage.origin_add_one(origin) | ||||
visit_id = self.storage.origin_visit_add( | visit_id = self.storage.origin_visit_add( | ||||
origin=self.url, | origin=self.url, | ||||
date=self.visit_date, | date=self.visit_date, | ||||
type=self.visit_type)['visit'] | type=self.visit_type)['visit'] | ||||
except Exception as e: | except Exception: | ||||
logger.error( | logger.exception('Failed to create origin/origin_visit:') | ||||
'Failed to create origin/origin_visit. Reason: %s', e) | |||||
return {'status': 'failed'} | return {'status': 'failed'} | ||||
try: | try: | ||||
last_snapshot = self.last_snapshot() | last_snapshot = self.last_snapshot() | ||||
logger.debug('last snapshot: %s', last_snapshot) | logger.debug('last snapshot: %s', last_snapshot) | ||||
known_artifacts = self.known_artifacts(last_snapshot) | known_artifacts = self.known_artifacts(last_snapshot) | ||||
logger.debug('known artifacts: %s', known_artifacts) | logger.debug('known artifacts: %s', known_artifacts) | ||||
Show All 38 Lines | def load(self) -> Dict: | ||||
} | } | ||||
for branch_name, target in branch_name_revisions: | for branch_name, target in branch_name_revisions: | ||||
branches[branch_name.encode('utf-8')] = { | branches[branch_name.encode('utf-8')] = { | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
'target': target, | 'target': target, | ||||
} | } | ||||
snapshot = { | snapshot_data = { | ||||
'branches': branches | 'branches': branches | ||||
} | } | ||||
logger.debug('snapshot: %s', snapshot) | logger.debug('snapshot: %s', snapshot_data) | ||||
snapshot['id'] = identifier_to_bytes( | snapshot = Snapshot.from_dict(snapshot_data) | ||||
Not Done Inline ActionsWhy not directly make it a snapshot object and then use its compute_hash method? That would make us use less swh.model code. ardumont: Why not directly make it a snapshot object and then use its `compute_hash` method?
That would… | |||||
snapshot_identifier(snapshot)) | |||||
logger.debug('snapshot: %s', snapshot) | logger.debug('snapshot: %s', snapshot) | ||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||
if hasattr(self.storage, 'flush'): | if hasattr(self.storage, 'flush'): | ||||
self.storage.flush() | self.storage.flush() | ||||
except Exception: | except Exception: | ||||
logger.exception('Fail to load %s' % self.url) | logger.exception('Fail to load %s' % self.url) | ||||
status_visit = 'partial' | status_visit = 'partial' | ||||
status_load = 'failed' | status_load = 'failed' | ||||
finally: | finally: | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
origin=self.url, visit_id=visit_id, status=status_visit, | origin=self.url, visit_id=visit_id, status=status_visit, | ||||
snapshot=snapshot and snapshot['id']) | snapshot=snapshot and snapshot.id) | ||||
result = { | result = { | ||||
'status': status_load, | 'status': status_load, | ||||
} # type: Dict[str, Any] | } # type: Dict[str, Any] | ||||
if snapshot: | if snapshot: | ||||
result['snapshot_id'] = hash_to_hex(snapshot['id']) | result['snapshot_id'] = hash_to_hex(snapshot.id) | ||||
return result | return result | ||||
def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | ||||
"""Does all the loading of a revision itself: | """Does all the loading of a revision itself: | ||||
* downloads a package and uncompresses it | * downloads a package and uncompresses it | ||||
* loads it from disk | * loads it from disk | ||||
* adds contents, directories, and revision to self.storage | * adds contents, directories, and revision to self.storage | ||||
* returns (revision_id, loaded) | * returns (revision_id, loaded) | ||||
""" | """ | ||||
with tempfile.TemporaryDirectory() as tmpdir: | with tempfile.TemporaryDirectory() as tmpdir: | ||||
try: | try: | ||||
dl_artifacts = self.download_package(p_info, tmpdir) | dl_artifacts = self.download_package(p_info, tmpdir) | ||||
except Exception: | except Exception: | ||||
logger.exception('Unable to retrieve %s', | logger.exception('Unable to retrieve %s', | ||||
p_info) | p_info) | ||||
return (None, False) | return (None, False) | ||||
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | ||||
logger.debug('uncompressed_path: %s', uncompressed_path) | logger.debug('uncompressed_path: %s', uncompressed_path) | ||||
directory = Directory.from_disk( | directory = from_disk.Directory.from_disk( | ||||
path=uncompressed_path.encode('utf-8'), | path=uncompressed_path.encode('utf-8'), | ||||
data=True) # noqa | max_content_length=self.max_content_size) | ||||
# FIXME: Try not to load the full raw content in | |||||
# memory | contents: List[Content] = [] | ||||
objects = directory.collect() | skipped_contents: List[SkippedContent] = [] | ||||
directories: List[Directory] = [] | |||||
contents, skipped_contents = prepare_contents( | |||||
objects.get('content', {}).values(), | for obj in directory.iter_tree(): | ||||
max_content_size=self.max_content_size, | obj = obj.to_model() | ||||
origin_url=origin['url']) | if isinstance(obj, Content): | ||||
self.storage.skipped_content_add(skipped_contents) | # FIXME: read the data from disk later (when the | ||||
# storage buffer is flushed). | |||||
obj = obj.with_data() | |||||
contents.append(obj) | |||||
elif isinstance(obj, SkippedContent): | |||||
skipped_contents.append(obj) | |||||
elif isinstance(obj, Directory): | |||||
directories.append(obj) | |||||
else: | |||||
raise TypeError( | |||||
f'Unexpected content type from disk: {obj}') | |||||
logger.debug('Number of skipped contents: %s', | logger.debug('Number of skipped contents: %s', | ||||
len(skipped_contents)) | len(skipped_contents)) | ||||
self.storage.content_add(contents) | self.storage.skipped_content_add(skipped_contents) | ||||
logger.debug('Number of contents: %s', len(contents)) | logger.debug('Number of contents: %s', len(contents)) | ||||
self.storage.content_add(contents) | |||||
directories = list( | |||||
objects.get('directory', {}).values()) | |||||
logger.debug('Number of directories: %s', len(directories)) | logger.debug('Number of directories: %s', len(directories)) | ||||
self.storage.directory_add(directories) | self.storage.directory_add(directories) | ||||
# FIXME: This should be release. cf. D409 | # FIXME: This should be release. cf. D409 | ||||
revision = self.build_revision(p_info['raw'], uncompressed_path) | revision = self.build_revision( | ||||
p_info['raw'], uncompressed_path, directory=directory.hash) | |||||
if not revision: | if not revision: | ||||
Not Done Inline Actionswondering whether the build_revision (old name) should return a Revision object directly (i know..., more work...). ardumont: wondering whether the `build_revision` (old name) should return a `Revision` object directly (i… | |||||
# Some artifacts are missing intrinsic metadata | # Some artifacts are missing intrinsic metadata | ||||
# skipping those | # skipping those | ||||
return (None, True) | return (None, True) | ||||
revision.update({ | metadata = revision.metadata or {} | ||||
'synthetic': True, | metadata.update({ | ||||
'directory': directory.hash, | |||||
}) | |||||
revision['metadata'].update({ | |||||
'original_artifact': [ | 'original_artifact': [ | ||||
hashes for _, hashes in dl_artifacts | hashes for _, hashes in dl_artifacts | ||||
], | ], | ||||
}) | }) | ||||
revision = attr.evolve(revision, metadata=metadata) | |||||
Not Done Inline ActionsUse Revision.compute_hash? ardumont: Use `Revision.compute_hash`? | |||||
revision['id'] = identifier_to_bytes( | |||||
revision_identifier(revision)) | |||||
logger.debug('Revision: %s', revision) | logger.debug('Revision: %s', revision) | ||||
self.storage.revision_add([revision]) | self.storage.revision_add([revision]) | ||||
return (revision['id'], True) | return (revision.id, True) |
Those 3 lines can be dropped as, that's taking care of here ;)